refactor(proj) sync 4.3.0 plugins with tracked files

This commit is contained in:
Zaiming Shi 2020-12-05 02:43:04 +01:00
parent 73d02beace
commit 0fb5fb31a5
73 changed files with 1431 additions and 3091 deletions

View File

@ -5,7 +5,7 @@ PROFILE ?= emqx
PROFILES := emqx emqx-edge
PKG_PROFILES := emqx-pkg emqx-edge-pkg
export REBAR_GIT_CLONE_OPTIONS=--depth=1
export REBAR_GIT_CLONE_OPTIONS += --depth=1
.PHONY: default
default: $(REBAR) $(PROFILE)

View File

@ -1 +1,27 @@
{deps, []}.
{deps, []}.
{edoc_opts, [{preprocess, true}]}.
{erl_opts, [warn_unused_vars,
warn_shadow_vars,
warn_unused_import,
warn_obsolete_guard,
debug_info,
{parse_transform}]}.
{xref_checks, [undefined_function_calls, undefined_functions,
locals_not_used, deprecated_function_calls,
warnings_as_errors, deprecated_functions]}.
{cover_enabled, true}.
{cover_opts, [verbose]}.
{cover_export_enabled, true}.
{profiles,
[{test,
[{deps,
[{emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "1.2.2"}}},
{emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.0"}}}
]}
]}
]}.

View File

@ -1,3 +1,24 @@
{deps,
[{jwerl, {git, "https://github.com/emqx/jwerl.git", {branch, "1.1.1"}}}
]}.
{edoc_opts, [{preprocess, true}]}.
{erl_opts, [warn_unused_vars,
warn_shadow_vars,
warn_unused_import,
warn_obsolete_guard,
debug_info,
{parse_transform}]}.
{xref_checks, [undefined_function_calls, undefined_functions,
locals_not_used, deprecated_function_calls,
warnings_as_errors, deprecated_functions]}.
{cover_enabled, true}.
{cover_opts, [verbose]}.
{cover_export_enabled, true}.
{profiles,
[{test,
[{deps, [{emqx_ct_helpers, {git, "http://github.com/emqx/emqx-ct-helpers", {tag, "1.2.2"}}}]}
]}
]}.

View File

@ -1,26 +0,0 @@
version: '3'
services:
erlang:
image: erlang:22.1
volumes:
- ../:/emqx_auth_ldap
networks:
- emqx_bridge
depends_on:
- ldap_server
tty: true
ldap_server:
build: ./emqx-ldap
image: emqx-ldap:1.0
restart: always
ports:
- 389:389
- 636:636
networks:
- emqx_bridge
networks:
emqx_bridge:
driver: bridge

View File

@ -1,26 +0,0 @@
FROM buildpack-deps:stretch
ENV VERSION=2.4.50
RUN apt-get update && apt-get install -y groff groff-base
RUN wget ftp://ftp.openldap.org/pub/OpenLDAP/openldap-release/openldap-${VERSION}.tgz \
&& gunzip -c openldap-${VERSION}.tgz | tar xvfB - \
&& cd openldap-${VERSION} \
&& ./configure && make depend && make && make install \
&& cd .. && rm -rf openldap-${VERSION}
COPY ./slapd.conf /usr/local/etc/openldap/slapd.conf
COPY ./emqx.io.ldif /usr/local/etc/openldap/schema/emqx.io.ldif
COPY ./emqx.schema /usr/local/etc/openldap/schema/emqx.schema
COPY ./*.pem /usr/local/etc/openldap/
RUN mkdir -p /usr/local/etc/openldap/data \
&& slapadd -l /usr/local/etc/openldap/schema/emqx.io.ldif -f /usr/local/etc/openldap/slapd.conf
WORKDIR /usr/local/etc/openldap
EXPOSE 389 636
ENTRYPOINT ["/usr/local/libexec/slapd", "-h", "ldap:/// ldaps:///", "-d", "3", "-f", "/usr/local/etc/openldap/slapd.conf"]
CMD []

View File

@ -1,16 +0,0 @@
include /usr/local/etc/openldap/schema/core.schema
include /usr/local/etc/openldap/schema/cosine.schema
include /usr/local/etc/openldap/schema/inetorgperson.schema
include /usr/local/etc/openldap/schema/ppolicy.schema
include /usr/local/etc/openldap/schema/emqx.schema
TLSCACertificateFile /usr/local/etc/openldap/cacert.pem
TLSCertificateFile /usr/local/etc/openldap/cert.pem
TLSCertificateKeyFile /usr/local/etc/openldap/key.pem
database bdb
suffix "dc=emqx,dc=io"
rootdn "cn=root,dc=emqx,dc=io"
rootpw {SSHA}eoF7NhNrejVYYyGHqnt+MdKNBh4r1w3W
directory /usr/local/etc/openldap/data

View File

@ -1,3 +1,25 @@
{deps,
[{eldap2, {git, "https://github.com/emqx/eldap2", {tag, "v0.2.2"}}}
]}.
{profiles,
[{test,
[{deps, [{emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "1.2.2"}}}]}
]}
]}.
{edoc_opts, [{preprocess, true}]}.
{erl_opts, [warn_unused_vars,
warn_shadow_vars,
warn_unused_import,
warn_obsolete_guard,
debug_info,
{parse_transform}]}.
{xref_checks, [undefined_function_calls, undefined_functions,
locals_not_used, deprecated_function_calls,
warnings_as_errors, deprecated_functions]}.
{cover_enabled, true}.
{cover_opts, [verbose]}.
{cover_export_enabled, true}.

View File

@ -1,2 +1,32 @@
{deps,
[{mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.7"}}}]}.
[{mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.7"}}}]}.
{edoc_opts, [{preprocess, true}]}.
{erl_opts, [warn_unused_vars,
warn_shadow_vars,
warn_unused_import,
warn_obsolete_guard,
debug_info,
compressed,
{parse_transform}
]}.
{overrides, [{add, [{erl_opts, [compressed]}]}]}.
{xref_checks, [undefined_function_calls, undefined_functions,
locals_not_used, deprecated_function_calls,
warnings_as_errors, deprecated_functions
]}.
{cover_enabled, true}.
{cover_opts, [verbose]}.
{cover_export_enabled, true}.
{profiles,
[{test,
[{deps,
[{emqx_ct_helper, {git, "https://github.com/emqx/emqx-ct-helper", {tag, "1.2.2"}}},
{emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.0"}}}
]},
{erl_opts, [debug_info]}
]}
]}.

View File

@ -2,3 +2,33 @@
[
{mysql, {git, "https://github.com/emqx/mysql-otp", {tag, "1.6.1"}}}
]}.
{edoc_opts, [{preprocess, true}]}.
{erl_opts, [warn_unused_vars,
warn_shadow_vars,
warn_unused_import,
warn_obsolete_guard,
debug_info,
compressed,
{parse_transform}
]}.
{overrides, [{add, [{erl_opts, [compressed]}]}]}.
{xref_checks, [undefined_function_calls, undefined_functions,
locals_not_used, deprecated_function_calls,
warnings_as_errors, deprecated_functions
]}.
{cover_enabled, true}.
{cover_opts, [verbose]}.
{cover_export_enabled, true}.
{profiles,
[{test,
[{deps,
[{emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "1.2.2"}}},
{emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.0"}}}
]},
{erl_opts, [debug_info]}
]}
]}.

View File

@ -1,30 +0,0 @@
version: '3'
services:
erlang:
image: erlang:22.3
volumes:
- ../:/emqx_auth_pgsql
networks:
- emqx_bridge
depends_on:
- pgsql_server
tty: true
pgsql_server:
build:
context: ./pgsql
args:
BUILD_FROM: postgres:${PGSQL_TAG}
image: emqx-pgsql
restart: always
environment:
POSTGRES_PASSWORD: public
POSTGRES_USER: root
POSTGRES_DB: mqtt
networks:
- emqx_bridge
networks:
emqx_bridge:
driver: bridge

View File

@ -1,8 +0,0 @@
ARG BUILD_FROM=postgres:11
FROM ${BUILD_FROM}
COPY pg.conf /etc/postgresql/postgresql.conf
COPY server-cert.pem /etc/postgresql/server-cert.pem
COPY server-key.pem /etc/postgresql/server-key.pem
RUN chown -R postgres:postgres /etc/postgresql \
&& chmod 600 /etc/postgresql/*.pem
CMD ["-c", "config_file=/etc/postgresql/postgresql.conf"]

View File

@ -1,3 +1,31 @@
{deps,
[{epgsql, {git, "https://github.com/epgsql/epgsql", {tag, "4.4.0"}}}
]}.
]}.
{erl_opts, [warn_unused_vars,
warn_shadow_vars,
warn_unused_import,
warn_obsolete_guard,
debug_info,
compressed
]}.
{overrides, [{add, [{erl_opts, [compressed]}]}]}.
{xref_checks, [undefined_function_calls, undefined_functions,
locals_not_used, deprecated_function_calls,
warnings_as_errors, deprecated_functions
]}.
{cover_enabled, true}.
{cover_opts, [verbose]}.
{cover_export_enabled, true}.
{profiles,
[{test,
[{deps,
[{emqx_ct_helper, {git, "https://github.com/emqx/emqx-ct-helper", {branch, "1.2.2"}}},
{emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.0"}}}
]},
{erl_opts, [debug_info]}
]}
]}.

View File

@ -1,26 +0,0 @@
version: '3'
services:
erlang:
image: erlang:22.1
volumes:
- ../:/emqx_auth_redis
networks:
- emqx_bridge
depends_on:
- redis_server
tty: true
redis_server:
build:
context: ./emqx-redis
args:
BUILD_FROM: redis:${REDIS_TAG}
image: emqx-redis
restart: always
networks:
- emqx_bridge
networks:
emqx_bridge:
driver: bridge

View File

@ -1,4 +0,0 @@
ARG BUILD_FROM=redis:5
FROM ${BUILD_FROM}
COPY redis.conf /usr/local/etc/redis/redis.conf
CMD [ "redis-server", "/usr/local/etc/redis/redis.conf" ]

File diff suppressed because it is too large Load Diff

View File

@ -33,6 +33,40 @@
hidden
]}.
{mapping, "auth.redis.ssl", "emqx_auth_redis.options", [
{default, off},
{datatype, flag}
]}.
{mapping, "auth.redis.cafile", "emqx_auth_redis.options", [
{default, ""},
{datatype, string}
]}.
{mapping, "auth.redis.certfile", "emqx_auth_redis.options", [
{default, ""},
{datatype, string}
]}.
{mapping, "auth.redis.keyfile", "emqx_auth_redis.options", [
{default, ""},
{datatype, string}
]}.
{translation, "emqx_auth_redis.options", fun(Conf) ->
Ssl = cuttlefish:conf_get("auth.redis.ssl", Conf, false),
case Ssl of
true ->
CA = cuttlefish:conf_get("auth.redis.cafile", Conf),
Cert = cuttlefish:conf_get("auth.redis.certfile", Conf),
Key = cuttlefish:conf_get("auth.redis.keyfile", Conf),
[{options, [{ssl_options, [{cacertfile, CA},
{certfile, Cert},
{keyfile, Key}]}]}];
_ -> [{options, []}]
end
end}.
{translation, "emqx_auth_redis.server", fun(Conf) ->
Fun = fun(S) ->
case string:split(S, ":", trailing) of
@ -103,4 +137,3 @@ end}.
_ -> plain
end
end}.

View File

@ -1,3 +1,31 @@
{deps,
[{eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "v0.6.1"}}}
]}.
[{eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.6.2"}}}
]}.
{erl_opts, [warn_unused_vars,
warn_shadow_vars,
warn_unused_import,
warn_obsolete_guard,
debug_info,
compressed
]}.
{overrides, [{add, [{erl_opts, [compressed]}]}]}.
{xref_checks, [undefined_function_calls, undefined_functions,
locals_not_used, deprecated_function_calls,
warnings_as_errors, deprecated_functions
]}.
{cover_enabled, true}.
{cover_opts, [verbose]}.
{cover_export_enabled, true}.
{profiles,
[{test,
[{deps,
[{emqx_ct_helper, {git, "https://github.com/emqx/emqx-ct-helper", {tag, "1.2.2"}}},
{emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.0"}}}
]},
{erl_opts, [debug_info]}
]}
]}.

View File

@ -41,13 +41,13 @@ connect(Opts) ->
eredis_sentinel:start_link(get_value(servers, Opts)),
"sentinel:" ++ Sentinel
end,
case eredis:start_link(
Host,
get_value(port, Opts, 6379),
get_value(database, Opts),
get_value(password, Opts),
no_reconnect
) of
case eredis:start_link(Host,
get_value(port, Opts, 6379),
get_value(database, Opts, 0),
get_value(password, Opts, ""),
3000,
5000,
get_value(options, Opts, [])) of
{ok, Pid} -> {ok, Pid};
{error, Reason = {connection_error, _}} ->
?LOG(error, "[Redis] Can't connect to Redis server: Connection refused."),

View File

@ -32,11 +32,12 @@ init([]) ->
{ok, {{one_for_one, 10, 100}, pool_spec(Server)}}.
pool_spec(Server) ->
Options = application:get_env(?APP, options, []),
case proplists:get_value(type, Server) of
cluster ->
eredis_cluster:start_pool(?APP, Server),
eredis_cluster:start_pool(?APP, Server ++ Options),
[];
_ ->
[ecpool:pool_spec(?APP, ?APP, emqx_auth_redis_cli, Server)]
[ecpool:pool_spec(?APP, ?APP, emqx_auth_redis_cli, Server ++ Options)]
end.

View File

@ -1 +1,27 @@
{deps, []}.
{deps, []}.
{edoc_opts, [{preprocess, true}]}.
{erl_opts, [warn_unused_vars,
warn_shadow_vars,
warn_unused_import,
warn_obsolete_guard,
debug_info,
{parse_transform}]}.
{xref_checks, [undefined_function_calls, undefined_functions,
locals_not_used, deprecated_function_calls,
warnings_as_errors, deprecated_functions]}.
{cover_enabled, true}.
{cover_opts, [verbose]}.
{cover_export_enabled, true}.
{shell, [
% {config, "config/sys.config"},
{apps, [emqx, emqx_bridge_mqtt]}
]}.
{profiles,
[{test,
[{deps,
[{emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "1.2.2"}}}]}
]}
]}.

View File

@ -556,7 +556,7 @@
default => <<"">>,
title => #{en => <<"Payload Template">>,
zh => <<"消息内容模板"/utf8>>},
description => #{en => <<"The payload template, variable interpolation is supported. If using empty template (default), then the payload will be all the available vars in JOSN format">>,
description => #{en => <<"The payload template, variable interpolation is supported. If using empty template (default), then the payload will be all the available vars in JSON format">>,
zh => <<"消息内容模板,支持变量。若使用空模板(默认),消息内容为 JSON 格式的所有字段"/utf8>>}
}
},

View File

@ -1,4 +1,28 @@
{deps,
[
{gen_coap, {git, "https://github.com/emqx/gen_coap", {tag, "v0.3.0"}}}
]}.
]}.
{edoc_opts, [{preprocess, true}]}.
{erl_opts, [warn_unused_vars,
warn_shadow_vars,
warn_unused_import,
warn_obsolete_guard,
debug_info,
{parse_transform}]}.
{xref_checks, [undefined_function_calls, undefined_functions,
locals_not_used, deprecated_function_calls,
warnings_as_errors, deprecated_functions]}.
{cover_enabled, true}.
{cover_opts, [verbose]}.
{cover_export_enabled, true}.
{profiles,
[{test,
[{deps,
[{er_coap_client, {git, "https://github.com/emqx/er_coap_client", {tag, "v1.0"}}},
{emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "1.2.2"}}}
]}
]}
]}.

View File

@ -1 +1,23 @@
{deps, []}.
{deps, []}.
{profiles,
[{test,
[{deps,
[{emqx_ct_helper, {git, "https://github.com/emqx/emqx-ct-helper", {branch, "1.2.2"}}}
]}
]}
]}.
{edoc_opts, [{preprocess, true}]}.
{erl_opts, [warn_unused_vars,
warn_shadow_vars,
warn_unused_import,
warn_obsolete_guard,
debug_info,
{d, 'APPLICATION', emqx}]}.
{xref_checks, [undefined_function_calls, undefined_functions,
locals_not_used, deprecated_function_calls,
warnings_as_errors, deprecated_functions]}.
{cover_enabled, true}.
{cover_opts, [verbose]}.
{cover_export_enabled, true}.

View File

@ -1,3 +0,0 @@
src/emqx_exhook_pb.erl
src/emqx_exhook_v_1_hook_provider_bhvr.erl
src/emqx_exhook_v_1_hook_provider_client.erl

View File

@ -1,39 +1,75 @@
# emqx_exhook
# emqx_extension_hook
The `emqx_exhook` extremly enhance the extensibility for EMQ X. It allow using an others programming language to mount the hooks intead of erlang.
The `emqx_extension_hook` extremly enhance the extensibility for EMQ X. It allow using an others programming language to mount the hooks intead of erlang.
## Feature
- [x] Based on gRPC, it brings a very wide range of applicability
- [x] Support `python` and `java`.
- [x] Support all hooks of emqx.
- [x] Allows you to use the return value to extend emqx behavior.
We temporarily no plans to support other languages. Plaease open a issue if you have to use other programming languages.
## Architecture
```
EMQ X Third-party Runtime
+========================+ +========+==========+
| ExHook | | | |
| +----------------+ | gRPC | gRPC | User's |
| | gPRC Client | ------------------> | Server | Codes |
| +----------------+ | (HTTP/2) | | |
| | | | |
+========================+ +========+==========+
EMQ X Third-party Runtimes
+========================+ +====================+
| Extension | | |
| +----------------+ | Hooks | Python scripts / |
| | Drivers | ------------------> | Java Classes / |
| +----------------+ | (pipe) | Others ... |
| | | |
+========================+ +====================+
```
## Usage
## Drivers
### gRPC service
### Python
See: `priv/protos/exhook.proto`
***Requirements:***
### CLI
- It requires the emqx hosted machine has Python3 Runtimes (not support python2)
- The `python3` executable commands in your shell
## Example
***Examples:***
## Recommended gRPC Framework
See `test/scripts/main.py`
See: https://github.com/grpc-ecosystem/awesome-grpc
### Java
## Thanks
***Requirements:***
- [grpcbox](https://github.com/tsloughter/grpcbox)
- It requires the emqx hosted machine has Java 8+ Runtimes
- An executable commands in your shell, i,g: `java`
***Examples:***
See `test/scripts/Main.java`
## Configurations
| Name | Data Type | Options | Default | Description |
| ------------------- | --------- | ------------------------------------- | ---------------- | -------------------------------- |
| drivers | Enum | `python3`<br />`java` | `python3` | Drivers type |
| <type>.path | String | - | `data/extension` | The codes/library search path |
| <type>.call_timeout | Duration | - | `5s` | Function call timeout |
| <type>.pool_size | Integer | - | `8` | The pool size for the driver |
| <type>.init_module | String | - | main | The module name for initial call |
## SDK
See `sdk/README.md`
## Known Issues or TODOs
**Configurable Log System**
- use stderr to print logs to the emqx console instead of stdout. An alternative is to print the logs to a file.
- The Java driver can not redirect the `stderr` stream to erlang vm on Windows platform.
## Reference
- [erlport](https://github.com/hdima/erlport)
- [Eexternal Term Format](http://erlang.org/doc/apps/erts/erl_ext_dist.html)
- [The Ports Tutorial of Erlang](http://erlang.org/doc/tutorial/c_port.html)

View File

@ -2,115 +2,254 @@
## 动机
在 EMQ X Broker v4.1-v4.2 中,我们发布了 2 个插件来扩展 emqx 的编程能力
增强系统的扩展性。包含的目的有
1. `emqx-extension-hook` 提供了使用 Java, Python 向 Broker 挂载钩子的功能
2. `emqx-exproto` 提供了使用 JavaPython 编写用户自定义协议接入插件的功能
- 完全支持各种钩子,能够根据其返回值修改 EMQ X 或者 Client 的行为。
- 例如 `auth/acl`:可以查询数据库或者执行某种算法校验操作权限。然后返回 `false` 表示 `认证/ACL` 失败。
- 例如 `message.publish`:可以解析 `消息/主题` 并将其存储至数据库中。
但在后续的支持中发现许多难以处理的问题:
- 支持多种语言的扩展;并包含该语言的示例程序。
- python
- webhook
- Java
- Lua
- cgo.....
- 热操作
- 允许在插件运行过程中,添加和移除 `Driver`
1. 有大量的编程语言需要支持,需要编写和维护如 Go, JavaScript, Lua.. 等语言的驱动。
2. `erlport` 使用的操作系统的管道进行通信,这让用户代码只能部署在和 emqx 同一个操作系统上。部署方式受到了极大的限制。
3. 用户程序的启动参数直接打包到 Broker 中,导致用户开发无法实时的进行调试,单步跟踪等。
4. `erlport` 会占用 `stdin` `stdout`
- 需要 CLI ,甚至 API 来管理 `Driver`
因此,我们计划重构这部分的实现,其中主要的内容是:
1. 使用 `gRPC` 替换 `erlport`
2. 将 `emqx-extension-hook` 重命名为 `emqx-exhook`
旧版本的设计参考:[emqx-extension-hook design in v4.2.0](https://github.com/emqx/emqx-exhook/blob/v4.2.0/docs/design.md)
注:`message` 类钩子仅包括在企业版中。
## 设计
架构如下:
```
EMQ X
+========================+ +========+==========+
| ExHook | | | |
| +----------------+ | gRPC | gRPC | User's |
| | gRPC Client | ------------------> | Server | Codes |
| +----------------+ | (HTTP/2) | | |
| | | | |
+========================+ +========+==========+
```
`emqx-exhook` 通过 gRPC 的方式向用户部署的 gRPC 服务发送钩子的请求,并处理其返回的值。
和 emqx 原生的钩子一致emqx-exhook 也支持链式的方式计算和返回:
<img src="https://docs.emqx.net/broker/latest/cn/advanced/assets/chain_of_responsiblity.png" style="zoom:50%;" />
### gRPC 服务示例
用户需要实现的方法,和数据类型的定义在 `priv/protos/exhook.proto` 文件中。例如,其支持的接口有:
```protobuff
syntax = "proto3";
package emqx.exhook.v1;
service HookProvider {
rpc OnProviderLoaded(ProviderLoadedRequest) returns (LoadedResponse) {};
rpc OnProviderUnloaded(ProviderUnloadedRequest) returns (EmptySuccess) {};
rpc OnClientConnect(ClientConnectRequest) returns (EmptySuccess) {};
rpc OnClientConnack(ClientConnackRequest) returns (EmptySuccess) {};
rpc OnClientConnected(ClientConnectedRequest) returns (EmptySuccess) {};
rpc OnClientDisconnected(ClientDisconnectedRequest) returns (EmptySuccess) {};
rpc OnClientAuthenticate(ClientAuthenticateRequest) returns (ValuedResponse) {};
rpc OnClientCheckAcl(ClientCheckAclRequest) returns (ValuedResponse) {};
rpc OnClientSubscribe(ClientSubscribeRequest) returns (EmptySuccess) {};
rpc OnClientUnsubscribe(ClientUnsubscribeRequest) returns (EmptySuccess) {};
rpc OnSessionCreated(SessionCreatedRequest) returns (EmptySuccess) {};
rpc OnSessionSubscribed(SessionSubscribedRequest) returns (EmptySuccess) {};
rpc OnSessionUnsubscribed(SessionUnsubscribedRequest) returns (EmptySuccess) {};
rpc OnSessionResumed(SessionResumedRequest) returns (EmptySuccess) {};
rpc OnSessionDiscarded(SessionDiscardedRequest) returns (EmptySuccess) {};
rpc OnSessionTakeovered(SessionTakeoveredRequest) returns (EmptySuccess) {};
rpc OnSessionTerminated(SessionTerminatedRequest) returns (EmptySuccess) {};
rpc OnMessagePublish(MessagePublishRequest) returns (ValuedResponse) {};
rpc OnMessageDelivered(MessageDeliveredRequest) returns (EmptySuccess) {};
rpc OnMessageDropped(MessageDroppedRequest) returns (EmptySuccess) {};
rpc OnMessageAcked(MessageAckedRequest) returns (EmptySuccess) {};
}
EMQ X Third-party Runtimes
+========================+ +====================+
| Extension | | |
| +----------------+ | Hooks | Python scripts / |
| | Drivers | ------------------> | Java Classes / |
| +----------------+ | (pipe) | Others ... |
| | | |
+========================+ +====================+
```
### 配置文件示例
```
## 配置 gRPC 服务地址 (HTTP)
##
## s1 为服务器的名称
exhook.server.s1.url = http://127.0.0.1:9001
#### 驱动 配置
## 配置 gRPC 服务地址 (HTTPS)
```properties
## Driver type
##
## s2 为服务器名称
exhook.server.s2.url = https://127.0.0.1:9002
exhook.server.s2.cacertfile = ca.pem
exhook.server.s2.certfile = cert.pem
exhook.server.s2.keyfile = key.pem
## Exmaples:
## - python3 --- 仅配置 python3
## - python3, java, webhook --- 配置多个 Driver
exhook.dirvers = python3, java, webhook
## --- 具体 driver 的配置详情
## Python
exhook.dirvers.python3.path = data/extension/python
exhook.dirvers.python3.call_timeout = 5s
exhook.dirvers.python3.pool_size = 8
## java
exhook.drivers.java.path = data/extension/java
...
```
#### 钩子配置
钩子支持配置在配置文件中,例如:
```properties
exhook.rule.python3.client.connected = {"module": "client", "callback": "on_client_connected"}
exhook.rule.python3.message.publish = {"module": "client", "callback": "on_client_connected", "topics": ["#", "t/#"]}
```
***已废弃!!(冗余)***
### 驱动抽象
#### APIs
| 方法名 | 说明 | 入参 | 返回 |
| ------------------------ | -------- | ------ | ------ |
| `init` | 初始化 | - | 见下表 |
| `deinit` | 销毁 | - | - |
| `xxx `*(由init函数定义)* | 钩子回调 | 见下表 | 见下表 |
##### init 函数规格
```erlang
%% init 函数
%% HookSpec : 为用户在脚本中的 初始化函数指定的;他会与配置文件中的内容作为默认值,进行合并
%% 该参数的目的,用于 EMQ X 判断需要执行哪些 Hook 和 如何执行 Hook
%% State : 为用户自己管理的数据内容EMQ X 不关心它,只来回透传
init() -> {HookSpec, State}.
%% 例如:
{[{client_connect, callback_m(), callback_f(),#{}, {}}]}
%%--------------------------------------------------------------
%% Type Defines
-tpye hook_spec() :: [{hookname(), callback_m(), callback_f(), hook_opts()}].
-tpye state :: any().
-type hookname() :: client_connect
| client_connack
| client_connected
| client_disconnected
| client_authenticate
| client_check_acl
| client_subscribe
| client_unsubscribe
| session_created
| session_subscribed
| session_unsubscribed
| session_resumed
| session_discarded %% TODO: Should squash to `terminated` ?
| session_takeovered %% TODO: Should squash to `terminated` ?
| session_terminated
| message_publish
| message_delivered
| message_acked
| message_dropped.
-type callback_m() :: atom(). -- 回调的模块名称python 为脚本文件名称java 为类名webhook 为 URI 地址
-type callback_f() :: atom(). -- 回调的方法名称pythonjava 等为方法名webhook 为资源地址
-tpye hook_opts() :: [{hook_key(), any()}]. -- 配置项;配置该项钩子的行为
-type hook_key() :: topics | ...
```
##### deinit 函数规格
``` erlang
%% deinit 函数;不关心返回的任何内容
deinit() -> any().
```
##### 回调函数规格
| 钩子 | 入参 | 返回 |
| -------------------- | ----------------------------------------------------- | --------- |
| client_connect | `connifno`<br />`props` | - |
| client_connack | `connifno`<br />`rc`<br />`props` | - |
| client_connected | `clientinfo`<br /> | - |
| client_disconnected | `clientinfo`<br />`reason` | - |
| client_authenticate | `clientinfo`<br />`result` | `result` |
| client_check_acl | `clientinfo`<br />`pubsub`<br />`topic`<br />`result` | `result` |
| client_subscribe | `clientinfo`<br />`props`<br />`topicfilters` | - |
| client_unsubscribe | `clientinfo`<br />`props`<br />`topicfilters` | - |
| session_created | `clientinfo` | - |
| session_subscribed | `clientinfo`<br />`topic`<br />`subopts` | - |
| session_unsubscribed | `clientinfo`<br />`topic` | - |
| session_resumed | `clientinfo` | - |
| session_discared | `clientinfo` | - |
| session_takeovered | `clientinfo` | - |
| session_terminated | `clientinfo`<br />`reason` | - |
| message_publish | `messsage` | `message` |
| message_delivered | `clientinfo`<br />`message` | - |
| message_dropped | `message` | - |
| message_acked | `clientinfo`<br />`message` | - |
上表中包含数据格式为:
```erlang
-type conninfo :: [ {node, atom()}
, {clientid, binary()}
, {username, binary()}
, {peerhost, binary()}
, {sockport, integer()}
, {proto_name, binary()}
, {proto_ver, integer()}
, {keepalive, integer()}
].
-type clientinfo :: [ {node, atom()}
, {clientid, binary()}
, {username, binary()}
, {password, binary()}
, {peerhost, binary()}
, {sockport, integer()}
, {protocol, binary()}
, {mountpoint, binary()}
, {is_superuser, boolean()}
, {anonymous, boolean()}
].
-type message :: [ {node, atom()}
, {id, binary()}
, {qos, integer()}
, {from, binary()}
, {topic, binary()}
, {payload, binary()}
, {timestamp, integer()}
].
-type rc :: binary().
-type props :: [{key(), value()}]
-type topics :: [topic()].
-type topic :: binary().
-type pubsub :: publish | subscribe.
-type result :: true | false.
```
### 统计
在驱动运行过程中,应有对每种钩子调用计数,例如:
```
exhook.python3.check_acl 10
```
### 管理
**CLI 示例:**
**列出所有的驱动**
```
./bin/emqx_ctl exhook dirvers list
Drivers(xxx=yyy)
Drivers(aaa=bbb)
```
**开关驱动**
```
./bin/emqx_ctl exhook drivers enable python3
ok
./bin/emqx_ctl exhook drivers disable python3
ok
./bin/emqx_ctl exhook drivers stats
python3.client_connect 123
webhook.check_acl 20
```

View File

@ -1,21 +1,5 @@
%%-*- mode: erlang -*-
{plugins,
[rebar3_proper,
{grpcbox_plugin, {git, "https://github.com/zmstone/grpcbox_plugin", {branch, "master"}}}
]}.
{deps,
[{grpcbox, {git, "https://github.com/tsloughter/grpcbox", {branch, "master"}}}
]}.
{grpc,
[{protos, ["priv/protos"]},
{gpb_opts, [{module_name_prefix, "emqx_"},
{module_name_suffix, "_pb"}]}
]}.
{provider_hooks,
[{pre, [{compile, {grpc, gen}}]}]}.
{deps, []}.
{edoc_opts, [{preprocess, true}]}.
@ -29,18 +13,13 @@
{xref_checks, [undefined_function_calls, undefined_functions,
locals_not_used, deprecated_function_calls,
warnings_as_errors, deprecated_functions]}.
{xref_ignores, [emqx_exhook_pb]}.
{cover_enabled, true}.
{cover_opts, [verbose]}.
{cover_export_enabled, true}.
{cover_excl_mods, [emqx_exhook_pb,
emqx_exhook_v_1_hook_provider_bhvr,
emqx_exhook_v_1_connection_client]}.
{profiles,
[{test, [
{deps, [ {emqx_ct_helper, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "v1.3.1"}}}
{deps, [ {emqx_ct_helper, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "v1.2.2"}}}
, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
]}
]}

View File

@ -1,5 +0,0 @@
src/emqx_exproto_pb.erl
src/emqx_exproto_v_1_connection_adapter_bhvr.erl
src/emqx_exproto_v_1_connection_adapter_client.erl
src/emqx_exproto_v_1_connection_handler_bhvr.erl
src/emqx_exproto_v_1_connection_handler_client.erl

View File

@ -4,25 +4,53 @@ The `emqx_exproto` extremly enhance the extensibility for EMQ X. It allow using
## Feature
- [x] Based on gRPC, it brings a very wide range of applicability
- [x] Allows you to use the return value to extend emqx behavior.
- [x] Support Python, Java.
- [x] Support the `tcp`, `ssl`, `udp`, `dtls` socket.
- [x] Provide the `PUB/SUB` interface to others language.
We temporarily no plans to support other languages. Plaease open a issue if you have to use other programming languages.
## Architecture
![EMQ X ExProto Arch](./docs/images/exproto-arch.jpg)
## Usage
## Drivers
### gRPC service
### Python
See: `priv/protos/exproto.proto`
***Requirements:***
## Example
- It requires the emqx hosted machine has Python3 Runtimes
- An executable commands in your shell, i,g: `python3` or `python`
## Recommended gRPC Framework
***Examples:***
See: https://github.com/grpc-ecosystem/awesome-grpc
See [example/main.python](https://github.com/emqx/emqx-exproto/blob/master/example/main.py)
## Thanks
### Java
- [grpcbox](https://github.com/tsloughter/grpcbox)
See [example/Main.java](https://github.com/emqx/emqx-exproto/blob/master/example/Main.java)
## SDK
The SDK encloses the underlying obscure data types and function interfaces. It only provides a convenience for development, it is not required.
See [sdk/README.md](https://github.com/emqx/emqx-exproto/blob/master/sdk/README.md)
## Benchmark
***Work in progress...***
## Known Issues or TODOs
- Configurable Log System.
* The Java driver can not redirect the `stderr` stream to erlang vm on Windows platform
## Reference
- [erlport](https://github.com/hdima/erlport)
- [External Term Format](http://erlang.org/doc/apps/erts/erl_ext_dist.html)
- [The Ports Tutorial of Erlang](http://erlang.org/doc/tutorial/c_port.html)

View File

@ -4,124 +4,173 @@
该插件给 EMQ X 带来的扩展性十分的强大,它能以你熟悉语言处理任何的私有协议,并享受由 EMQ X 系统带来的高连接,和高并发的优点。
**声明:当前仅实现了 Python、Java 的支持**
## 特性
- 极强的扩展能力。使用 gRPC 作为 RPC 通信框架,支持各个主流编程语言
- 多语言支持。快速将接入层的协议实现迁移到 EMQ X 中进行管理
- 高吞吐。连接层以完全的异步非阻塞式 I/O 的方式实现
- 连接层透明。完全的支持 TCP\TLS UDP\DTLS 类型的连接管理,并对上层提供统一个 API
- 完善的连接层。完全的支持 TCP\TLS UDP\DTLS 类型的连接
- 连接层的管理能力。例如最大连接数连接和吞吐的速率限制IP 黑名单 等
## 架构
## 架构
![Extension-Protocol Arch](images/exproto-arch.jpg)
该插件主要需要处理的内容包括
该插件需要完成的工作包括三部分
1. **连接层:** 该部分主要**维持 Socket 的生命周期,和数据的收发**。它的功能要求包括:
- 监听某个端口。当有新的 TCP/UDP 连接到达后,启动一个连接进程,来维持连接的状态。
- 调用 `OnSocketCreated` 回调。用于通知外部模块**已新建立了一个连接**。
- 调用 `OnScoektClosed` 回调。用于通知外部模块连接**已关闭**。
- 调用 `OnReceivedBytes` 回调。用于通知外部模块**该连接新收到的数据包**。
- 提供 `Send` 接口。供外部模块调用,**用于发送数据包**。
- 提供 `Close` 接口。供外部模块调用,**用于主动关闭连接**。
**初始化:** (TODO)
- loaded:
- unload:
2. **协议/会话层:**该部分主要**提供 PUB/SUB 接口**,以实现与 EMQ X Broker 系统的消息互通。包括:
**连接层:** 该部分主要**维持 Socket 的生命周期,和数据的收发**。它的功能要求包括:
- 提供 `Authenticate` 接口。供外部模块调用,用于向集群注册客户端。
- 提供 `StartTimer` 接口。供外部模块调用,用于为该连接进程启动心跳等定时器。
- 提供 `Publish` 接口。供外部模块调用,用于发布消息 EMQ X Broker 中。
- 提供 `Subscribe` 接口。供外部模块调用,用于订阅某主题,以实现从 EMQ X Broker 中接收某些下行消息。
- 提供 `Unsubscribe` 接口。供外部模块调用,用于取消订阅某主题。
- 调用 `OnTimerTimeout` 回调。用于处理定时器超时的事件。
- 调用 `OnReceivedMessages` 回调。用于接收下行消息(在订阅主题成功后,如果主题上有消息,便会回调该方法)
- 监听某个端口。当有新的 TCP/UDP 连接到达后,启动一个连接进程,来维持连接的状态。
- 调用 `init` 回调。用于通知外部模块**已新建立了一个连接**。
- 调用 `terminated` 回调。用于通知外部模块连接**已关闭**。
- 调用 `received` 回调。用于通知外部模块**该连接新收到的数据包**。
- 提供 `send` 接口。供外部模块调用,**用于发送数据包**。
- 提供 `close` 接口。供外部模块调用,**用于主动关闭连接**。
**协议/会话层:**该部分主要**提供 PUB/SUB 接口**,以实现与 EMQ X Broker 系统的消息互通。包括:
- 提供 `register` 接口。供外部模块调用,用于向集群注册客户端。
- 提供 `publish` 接口。供外部模块调用,用于发布消息 EMQ X Broker 中。
- 提供 `subscribe` 接口。供外部模块调用,用于订阅某主题,以实现从 EMQ X Broker 中接收某些下行消息。
- 提供 `unsubscribe` 接口。供外部模块调用,用于取消订阅某主题。
- 调用 `deliver` 回调。用于接收下行消息(在订阅主题成功后,如果主题上有消息,便会回调该方法)
**管理&统计相关:** 该部分主要提供其他**管理&统计相关的接口**。包括:
- 提供 `Hooks` 类的接口。用于与系统的钩子系统进行交互。
- 提供 `Metrics` 类的接口。用于统计。
- 提供 `HTTP or CLI` 管理类接口。
## 接口设计
从 gRPC 上的逻辑来说emqx-exproto 会作为客户端向用户的 `ProtocolHandler` 服务发送回调请求。同时,它也会作为服务端向用户提供 `ConnectionAdapter` 服务,以提供 emqx-exproto 各个接口的访问。如图:
### 连接层接口
![Extension Protocol gRPC Arch](images/exproto-grpc-arch.jpg)
多语言组件需要向 EMQ X 注册的回调函数:
```erlang
%% Got a new Connection
init(conn(), conninfo()) -> state().
详情参见:`priv/protos/exproto.proto`,例如接口的定义有:
%% Incoming a data
recevied(conn(), data(), state()) -> state().
```protobuff
syntax = "proto3";
%% Socket & Connection process terminated
terminated(conn(), reason(), state()) -> ok.
package emqx.exproto.v1;
-opaue conn() :: pid().
// The Broker side serivce. It provides a set of APIs to
// handle a protcol access
service ConnectionAdapter {
-type conninfo() :: [ {socktype, tcp | tls | udp | dtls},
, {peername, {inet:ip_address(), inet:port_number()}},
, {sockname, {inet:ip_address(), inet:port_number()}},
, {peercert, nossl | [{cn, string()}, {dn, string()}]}
]).
// -- socket layer
-type reason() :: string().
rpc Send(SendBytesRequest) returns (CodeResponse) {};
rpc Close(CloseSocketRequest) returns (CodeResponse) {};
// -- protocol layer
rpc Authenticate(AuthenticateRequest) returns (CodeResponse) {};
rpc StartTimer(TimerRequest) returns (CodeResponse) {};
// -- pub/sub layer
rpc Publish(PublishRequest) returns (CodeResponse) {};
rpc Subscribe(SubscribeRequest) returns (CodeResponse) {};
rpc Unsubscribe(UnsubscribeRequest) returns (CodeResponse) {};
}
service ConnectionHandler {
// -- socket layer
rpc OnSocketCreated(SocketCreatedRequest) returns (EmptySuccess) {};
rpc OnSocketClosed(SocketClosedRequest) returns (EmptySuccess) {};
rpc OnReceivedBytes(ReceivedBytesRequest) returns (EmptySuccess) {};
// -- pub/sub layer
rpc OnTimerTimeout(TimerTimeoutRequest) returns (EmptySuccess) {};
rpc OnReceivedMessages(ReceivedMessagesRequest) returns (EmptySuccess) {};
}
-type state() :: any().
```
`emqx-exproto` 需要向多语言插件提供的接口:
``` erlang
%% Send a data to socket
send(conn(), data()) -> ok.
%% Close the socket
close(conn() ) -> ok.
```
### 协议/会话层接口
多语言组件需要向 EMQ X 注册的回调函数:
```erlang
%% Received a message from a Topic
deliver(conn(), [message()], state()) -> state().
-type message() :: [ {id, binary()}
, {qos, integer()}
, {from, binary()}
, {topic, binary()}
, {payload, binary()}
, {timestamp, integer()}
].
```
`emqx-exproto` 需要向多语言插件提供的接口:
``` erlang
%% Reigster the client to Broker
register(conn(), clientinfo()) -> ok | {error, Reason}.
%% Publish a message to Broker
publish(conn(), message()) -> ok.
%% Subscribe a topic
subscribe(conn(), topic(), qos()) -> ok.
%% Unsubscribe a topic
unsubscribe(conn(), topic()) -> ok.
-type clientinfo() :: [ {proto_name, binary()}
, {proto_ver, integer() | string()}
, {clientid, binary()}
, {username, binary()}
, {mountpoint, binary()}}
, {keepalive, non_neg_integer()}
].
```
### 管理&统计相关接口
*TODO..*
## 配置项设计
1. 以 **监听器( Listener)** 为基础,提供 TCP/UDP 的监听。
- Listener 目前仅支持TCP、TLS、UDP、DTLS。(ws、wss、quic 暂不支持)
2. 每个监听器,会指定一个 `ProtocolHandler` 的服务地址,用于调用外部模块的接口。
3. emqx-exproto 还会监听一个 gRPC 端口用于提供对 `ConnectionAdapter` 服务的访问。
2. 每个监听器,会指定一个多语言的驱动,用于调用外部模块的接口
- Driver 目前仅支持pythonjava
例如:
``` properties
## gRPC 服务监听地址 (HTTP)
##
exproto.server.http.url = http://127.0.0.1:9002
## A JT/T 808 TCP based example:
exproto.listener.jtt808 = 6799
exproto.listener.jtt808.type = tcp
exproto.listener.jtt808.driver = python
# acceptors, max_connections, max_conn_rate, ...
# proxy_protocol, ...
# sndbuff, recbuff, ...
# ssl, cipher, certfile, psk, ...
## gRPC 服务监听地址 (HTTPS)
##
exproto.server.https.url = https://127.0.0.1:9002
exproto.server.https.cacertfile = ca.pem
exproto.server.https.certfile = cert.pem
exproto.server.https.keyfile = key.pem
## Listener 配置
## 例如,名称为 protoname 协议的 TCP 监听器配置
exproto.listener.protoname = tcp://0.0.0.0:7993
## ProtocolHandler 服务地址及 https 的证书配置
exproto.listener.protoname.proto_handler_url = http://127.0.0.1:9001
#exproto.listener.protoname.proto_handler_certfile =
#exproto.listener.protoname.proto_handler_cacertfile =
#exproto.listener.protoname.proto_handler_keyfile =
exproto.listener.jtt808.<key> = <value>
## A CoAP UDP based example
exproto.listener.coap = 6799
exproto.listener.coap.type = udp
exproto.listener.coap.driver = java
# ...
```
## 集成与调试
参见 SDK 规范、和对应语言的开发手册
## SDK 实现要求
参见 SDK 规范、和对应语言的开发手册
## TODOs:
- 认证 和 发布 订阅鉴权等钩子接入

Binary file not shown.

Before

Width:  |  Height:  |  Size: 71 KiB

After

Width:  |  Height:  |  Size: 84 KiB

View File

@ -22,16 +22,3 @@
%% TODO:
-define(UDP_SOCKOPTS, []).
%%--------------------------------------------------------------------
%% gRPC result code
-define(RESP_UNKNOWN, 'UNKNOWN').
-define(RESP_SUCCESS, 'SUCCESS').
-define(RESP_CONN_PROCESS_NOT_ALIVE, 'CONN_PROCESS_NOT_ALIVE').
-define(RESP_PARAMS_TYPE_ERROR, 'PARAMS_TYPE_ERROR').
-define(RESP_REQUIRED_PARAMS_MISSED, 'REQUIRED_PARAMS_MISSED').
-define(RESP_PERMISSION_DENY, 'PERMISSION_DENY').
-define(IS_GRPC_RESULT_CODE(C), ( C =:= ?RESP_SUCCESS
orelse C =:= ?RESP_CONN_PROCESS_NOT_ALIVE
orelse C =:= ?RESP_REQUIRED_PARAMS_MISSED
orelse C =:= ?RESP_PERMISSION_DENY)).

View File

@ -1,66 +1,25 @@
%% -*-: erlang -*-
%%--------------------------------------------------------------------
%% Services
{mapping, "exproto.server.http.port", "emqx_exproto.servers", [
{datatype, integer}
]}.
{mapping, "exproto.server.https.port", "emqx_exproto.servers", [
{datatype, integer}
]}.
{mapping, "exproto.server.https.cacertfile", "emqx_exproto.servers", [
{datatype, string}
]}.
{mapping, "exproto.server.https.certfile", "emqx_exproto.servers", [
{datatype, string}
]}.
{mapping, "exproto.server.https.keyfile", "emqx_exproto.servers", [
{datatype, string}
]}.
{translation, "emqx_exproto.servers", fun(Conf) ->
Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end,
Http = case cuttlefish:conf_get("exproto.server.http.port", Conf, undefined) of
undefined -> [];
P1 -> [{http, P1, []}]
end,
Https = case cuttlefish:conf_get("exproto.server.https.port", Conf, undefined) of
undefined -> [];
P2 ->
[{https, P2,
Filter([{ssl, true},
{certfile, cuttlefish:conf_get("exproto.server.https.certfile", Conf)},
{keyfile, cuttlefish:conf_get("exproto.server.https.keyfile", Conf)},
{cacertfile, cuttlefish:conf_get("exproto.server.https.cacertfile", Conf)}])}]
end,
Http ++ Https
end}.
%%--------------------------------------------------------------------
%% Listeners
%%--------------------------------------------------------------------
%%--------------------------------------------------------------------
%% TCP Listeners
{mapping, "exproto.listener.$proto", "emqx_exproto.listeners", [
{datatype, string}
]}.
{mapping, "exproto.listener.$proto.connection_handler_url", "emqx_exproto.listeners", [
{mapping, "exproto.listener.$proto.driver", "emqx_exproto.listeners", [
{datatype, {enum, [python3, java]}}
]}.
{mapping, "exproto.listener.$proto.driver_search_path", "emqx_exproto.listeners", [
{datatype, string}
]}.
{mapping, "exproto.listener.$proto.connection_handler_certfile", "emqx_exproto.listeners", [
{datatype, string}
]}.
{mapping, "exproto.listener.$proto.connection_handler_cacertfile", "emqx_exproto.listeners", [
{datatype, string}
]}.
{mapping, "exproto.listener.$proto.connection_handler_keyfile", "emqx_exproto.listeners", [
{mapping, "exproto.listener.$proto.driver_callback_module", "emqx_exproto.listeners", [
{default, "main"},
{datatype, string}
]}.
@ -231,23 +190,14 @@ end}.
{Rate, Limit}
end,
HandlerOpts = fun(Prefix) ->
Opts =
case http_uri:parse(cuttlefish:conf_get(Prefix ++ ".connection_handler_url", Conf)) of
{ok, {http, _, Host, Port, _, _}} ->
[{scheme, http}, {host, Host}, {port, Port}];
{ok, {https, _, Host, Port, _, _}} ->
[{scheme, https}, {host, Host}, {port, Port},
{ssl_options,
Filter([{certfile, cuttlefish:conf_get(Prefix ++ ".connection_handler_certfile", Conf)},
{keyfile, cuttlefish:conf_get(Prefix ++ ".connection_handler_keyfile", Conf)},
{cacertfile, cuttlefish:conf_get(Prefix ++ ".connection_handler_cacertfile", Conf)}
])}];
_ ->
error(invaild_connection_handler_url)
end,
[{handler, Opts}]
end,
DriverOpts = fun(Prefix) ->
[{driver,
Filter([{type, cuttlefish:conf_get(Prefix ++ ".driver", Conf)},
{path, cuttlefish:conf_get(Prefix ++ ".driver_search_path", Conf)},
{cbm, Atom(cuttlefish:conf_get(Prefix ++ ".driver_callback_module", Conf))}
])
}]
end,
ConnOpts = fun(Prefix) ->
Filter([{active_n, cuttlefish:conf_get(Prefix ++ ".active_n", Conf, undefined)},
@ -339,7 +289,7 @@ end}.
Listeners = fun(Proto) ->
Prefix = string:join(["exproto","listener", Proto], "."),
Opts = HandlerOpts(Prefix) ++ ConnOpts(Prefix) ++ LisOpts(Prefix),
Opts = DriverOpts(Prefix) ++ ConnOpts(Prefix) ++ LisOpts(Prefix),
case cuttlefish:conf_get(Prefix, Conf, undefined) of
undefined -> [];
ListenOn0 ->

View File

@ -1,4 +1,7 @@
%%-*- mode: erlang -*-
{deps, [{erlport, {git, "https://github.com/emqx/erlport", {tag, "v1.2.2"}}}]}.
{edoc_opts, [{preprocess, true}]}.
{erl_opts, [warn_unused_vars,
@ -8,38 +11,12 @@
debug_info,
{parse_transform}]}.
{plugins,
[{grpcbox_plugin, {git, "https://github.com/zmstone/grpcbox_plugin", {branch, "master"}}}
]}.
{deps,
[{grpcbox, {git, "https://github.com/tsloughter/grpcbox", {branch, "master"}}}
]}.
{grpc,
[{type, all},
{protos, ["priv/protos"]},
{gpb_opts, [{module_name_prefix, "emqx_"},
{module_name_suffix, "_pb"}]}
]}.
{provider_hooks,
[{pre, [{compile, {grpc, gen}}]}]}.
{xref_checks, [undefined_function_calls, undefined_functions,
locals_not_used, deprecated_function_calls,
warnings_as_errors, deprecated_functions]}.
{xref_ignores, [emqx_exproto_pb]}.
{cover_enabled, true}.
{cover_opts, [verbose]}.
{cover_export_enabled, true}.
{cover_excl_mods, [emqx_exproto_pb,
emqx_exproto_v_1_connection_adapter_client,
emqx_exproto_v_1_connection_adapter_bhvr,
emqx_exproto_v_1_connection_handler_client,
emqx_exproto_v_1_connection_handler_bhvr]}.
{profiles,
[{test, [

View File

@ -1,12 +1,14 @@
{application, emqx_exproto,
[{description, "EMQ X Extension for Protocol"},
{vsn, "4.3.0"}, % strict semver, bump manually!
{vsn, "4.3.0"}, %% strict semver
{modules, []},
{registered, []},
{mod, {emqx_exproto_app, []}},
{applications, [kernel,stdlib]},
{applications, [kernel, stdlib, erlport]},
{env,[]},
{licenses, ["Apache-2.0"]},
{maintainers, ["EMQ X Team <contact@emqx.io>"]},
{links, [{"Homepage", "https://emqx.io/"}]}
{links, [{"Homepage", "https://emqx.io/"},
{"Github", "https://github.com/emqx/emqx-extension-proto"}
]}
]}.

View File

@ -16,20 +16,24 @@
-module(emqx_exproto).
-compile({no_auto_import, [register/1]}).
-include("emqx_exproto.hrl").
-export([ start_listeners/0
, stop_listeners/0
, start_listener/1
, start_listener/4
, stop_listener/4
, stop_listener/1
]).
-export([ start_servers/0
, stop_servers/0
, start_server/1
, stop_server/1
%% APIs: Connection level
-export([ send/2
, close/1
]).
%% APIs: Protocol/Session level
-export([ register/2
, publish/2
, subscribe/3
, unsubscribe/2
]).
%%--------------------------------------------------------------------
@ -38,71 +42,78 @@
-spec(start_listeners() -> ok).
start_listeners() ->
Listeners = application:get_env(?APP, listeners, []),
NListeners = [start_connection_handler_instance(Listener)
|| Listener <- Listeners],
lists:foreach(fun start_listener/1, NListeners).
lists:foreach(fun start_listener/1, application:get_env(?APP, listeners, [])).
-spec(stop_listeners() -> ok).
stop_listeners() ->
Listeners = application:get_env(?APP, listeners, []),
lists:foreach(fun stop_connection_handler_instance/1, Listeners),
lists:foreach(fun stop_listener/1, Listeners).
lists:foreach(fun stop_listener/1, application:get_env(?APP, listeners, [])).
-spec(start_servers() -> ok).
start_servers() ->
lists:foreach(fun start_server/1, application:get_env(?APP, servers, [])).
%%--------------------------------------------------------------------
%% APIs - Connection level
%%--------------------------------------------------------------------
-spec(stop_servers() -> ok).
stop_servers() ->
lists:foreach(fun stop_server/1, application:get_env(?APP, servers, [])).
-spec(send(pid(), binary()) -> ok).
send(Conn, Data) when is_pid(Conn), is_binary(Data) ->
emqx_exproto_conn:cast(Conn, {send, Data}).
-spec(close(pid()) -> ok).
close(Conn) when is_pid(Conn) ->
emqx_exproto_conn:cast(Conn, close).
%%--------------------------------------------------------------------
%% APIs - Protocol/Session level
%%--------------------------------------------------------------------
-spec(register(pid(), list()) -> ok | {error, any()}).
register(Conn, ClientInfo0) ->
case emqx_exproto_types:parse(clientinfo, ClientInfo0) of
{error, Reason} ->
{error, Reason};
ClientInfo ->
emqx_exproto_conn:cast(Conn, {register, ClientInfo})
end.
-spec(publish(pid(), list()) -> ok | {error, any()}).
publish(Conn, Msg0) when is_pid(Conn), is_list(Msg0) ->
case emqx_exproto_types:parse(message, Msg0) of
{error, Reason} ->
{error, Reason};
Msg ->
emqx_exproto_conn:cast(Conn, {publish, Msg})
end.
-spec(subscribe(pid(), binary(), emqx_types:qos()) -> ok | {error, any()}).
subscribe(Conn, Topic, Qos)
when is_pid(Conn), is_binary(Topic),
(Qos =:= 0 orelse Qos =:= 1 orelse Qos =:= 2) ->
emqx_exproto_conn:cast(Conn, {subscribe, Topic, Qos}).
-spec(unsubscribe(pid(), binary()) -> ok | {error, any()}).
unsubscribe(Conn, Topic)
when is_pid(Conn), is_binary(Topic) ->
emqx_exproto_conn:cast(Conn, {unsubscribe, Topic}).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
start_connection_handler_instance({_Proto, _LisType, _ListenOn, Opts}) ->
Name = name(_Proto, _LisType),
{value, {_, HandlerOpts}, LisOpts} = lists:keytake(handler, 1, Opts),
{Endpoints, ChannelOptions} = handler_opts(HandlerOpts),
case emqx_exproto_sup:start_grpc_client_channel(Name, Endpoints, ChannelOptions) of
{ok, _ClientChannelPid} ->
{_Proto, _LisType, _ListenOn, [{handler, Name} | LisOpts]};
{error, Reason} ->
io:format(standard_error, "Failed to start ~s's connection handler - ~0p~n!",
[Name, Reason]),
error(Reason)
end.
stop_connection_handler_instance({_Proto, _LisType, _ListenOn, _Opts}) ->
Name = name(_Proto, _LisType),
_ = emqx_exproto_sup:stop_grpc_client_channel(Name),
ok.
start_server({Name, Port, SSLOptions}) ->
case emqx_exproto_sup:start_grpc_server(Name, Port, SSLOptions) of
{ok, _} ->
io:format("Start ~s gRPC server on ~w successfully.~n",
[Name, Port]);
{error, Reason} ->
io:format(standard_error, "Failed to start ~s gRPC server on ~w - ~0p~n!",
[Name, Port, Reason]),
error({failed_start_server, Reason})
end.
stop_server({Name, Port, _SSLOptions}) ->
ok = emqx_exproto_sup:stop_grpc_server(Name),
io:format("Stop ~s gRPC server on ~w successfully.~n", [Name, Port]).
start_listener({Proto, LisType, ListenOn, Opts}) ->
Name = name(Proto, LisType),
case start_listener(LisType, Name, ListenOn, Opts) of
{ok, _} ->
io:format("Start ~s listener on ~s successfully.~n",
[Name, format(ListenOn)]);
{value, {_, DriverOpts}, LisOpts} = lists:keytake(driver, 1, Opts),
case emqx_exproto_driver_mngr:ensure_driver(Name, DriverOpts) of
{ok, _DriverPid}->
case start_listener(LisType, Name, ListenOn, [{driver, Name} |LisOpts]) of
{ok, _} ->
io:format("Start ~s listener on ~s successfully.~n",
[Name, format(ListenOn)]);
{error, Reason} ->
io:format(standard_error, "Failed to start ~s listener on ~s - ~0p~n!",
[Name, format(ListenOn), Reason]),
error(Reason)
end;
{error, Reason} ->
io:format(standard_error, "Failed to start ~s listener on ~s - ~0p~n!",
[Name, format(ListenOn), Reason]),
io:format(standard_error, "Failed to start ~s's driver - ~0p~n!",
[Name, Reason]),
error(Reason)
end.
@ -126,11 +137,11 @@ start_listener(dtls, Name, ListenOn, LisOpts) ->
stop_listener({Proto, LisType, ListenOn, Opts}) ->
Name = name(Proto, LisType),
_ = emqx_exproto_driver_mngr:stop_driver(Name),
StopRet = stop_listener(LisType, Name, ListenOn, Opts),
case StopRet of
ok ->
io:format("Stop ~s listener on ~s successfully.~n",
[Name, format(ListenOn)]);
ok -> io:format("Stop ~s listener on ~s successfully.~n",
[Name, format(ListenOn)]);
{error, Reason} ->
io:format(standard_error, "Failed to stop ~s listener on ~s - ~p~n.",
[Name, format(ListenOn), Reason])
@ -146,12 +157,8 @@ name(Proto, LisType) ->
list_to_atom(lists:flatten(io_lib:format("~s:~s", [Proto, LisType]))).
%% @private
format(Port) when is_integer(Port) ->
io_lib:format("0.0.0.0:~w", [Port]);
format({Addr, Port}) when is_list(Addr) ->
io_lib:format("~s:~w", [Addr, Port]);
format({Addr, Port}) when is_tuple(Addr) ->
io_lib:format("~s:~w", [inet:ntoa(Addr), Port]).
io_lib:format("~s:~w", [Addr, Port]).
%% @private
merge_tcp_default(Opts) ->
@ -169,15 +176,3 @@ merge_udp_default(Opts) ->
false ->
[{udp_options, ?UDP_SOCKOPTS} | Opts]
end.
%% @private
handler_opts(Opts) ->
Scheme = proplists:get_value(scheme, Opts),
Host = proplists:get_value(host, Opts),
Port = proplists:get_value(port, Opts),
Options = proplists:get_value(options, Opts, []),
SslOpts = case Scheme of
https -> proplists:get_value(ssl_options, Opts, []);
_ -> []
end,
{[{Scheme, Host, Port, SslOpts}], maps:from_list(Options)}.

View File

@ -24,14 +24,13 @@
start(_StartType, _StartArgs) ->
{ok, Sup} = emqx_exproto_sup:start_link(),
emqx_exproto:start_servers(),
emqx_exproto:start_listeners(),
{ok, Sup}.
prep_stop(State) ->
emqx_exproto:stop_servers(),
emqx_exproto:stop_listeners(),
State.
stop(_State) ->
ok.

View File

@ -16,7 +16,6 @@
-module(emqx_exproto_channel).
-include("emqx_exproto.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/types.hrl").
@ -42,26 +41,20 @@
-export_type([channel/0]).
-record(channel, {
%% gRPC channel options
gcli :: map(),
%% Driver name
driver :: atom(),
%% Conn info
conninfo :: emqx_types:conninfo(),
%% Client info from `register` function
clientinfo :: maybe(map()),
%% Registered
registered = false :: boolean(),
%% Connection state
conn_state :: conn_state(),
%% Subscription
subscriptions = #{},
%% Request queue
rqueue = queue:new(),
%% Inflight function name
inflight = undefined,
%% Keepalive
keepalive :: maybe(emqx_keepalive:keepalive()),
%% Timers
timers :: #{atom() => disabled | maybe(reference())},
%% Closed reason
closed_reason = undefined
%% Driver level state
state :: any()
}).
-opaque(channel() :: #channel{}).
@ -74,11 +67,6 @@
-type(replies() :: emqx_types:packet() | reply() | [reply()]).
-define(TIMER_TABLE, #{
alive_timer => keepalive,
force_timer => force_close
}).
-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]).
-define(SESSION_STATS_KEYS,
@ -142,44 +130,20 @@ stats(#channel{subscriptions = Subs}) ->
%%--------------------------------------------------------------------
-spec(init(emqx_exproto_types:conninfo(), proplists:proplist()) -> channel()).
init(ConnInfo = #{socktype := Socktype,
peername := Peername,
sockname := Sockname,
peercert := Peercert}, Options) ->
GRpcChann = proplists:get_value(handler, Options),
NConnInfo = default_conninfo(ConnInfo),
ClientInfo = default_clientinfo(ConnInfo),
Channel = #channel{gcli = #{channel => GRpcChann},
conninfo = NConnInfo,
clientinfo = ClientInfo,
conn_state = connecting,
timers = #{}
},
Req = #{conninfo =>
peercert(Peercert,
#{socktype => socktype(Socktype),
peername => address(Peername),
sockname => address(Sockname)})},
try_dispatch(on_socket_created, wrap(Req), Channel).
%% @private
peercert(nossl, ConnInfo) ->
ConnInfo;
peercert(Peercert, ConnInfo) ->
ConnInfo#{peercert =>
#{cn => esockd_peercert:common_name(Peercert),
dn => esockd_peercert:subject(Peercert)}}.
%% @private
socktype(tcp) -> 'TCP';
socktype(ssl) -> 'SSL';
socktype(udp) -> 'UDP';
socktype(dtls) -> 'DTLS'.
%% @private
address({Host, Port}) ->
#{host => inet:ntoa(Host), port => Port}.
init(ConnInfo, Options) ->
Driver = proplists:get_value(driver, Options),
case cb_init(ConnInfo, Driver) of
{ok, DState} ->
NConnInfo = default_conninfo(ConnInfo),
ClientInfo = default_clientinfo(ConnInfo),
#channel{driver = Driver,
state = DState,
conninfo = NConnInfo,
clientinfo = ClientInfo,
conn_state = connected};
{error, Reason} ->
exit({init_channel_failed, Reason})
end.
%%--------------------------------------------------------------------
%% Handle incoming packet
@ -189,163 +153,81 @@ address({Host, Port}) ->
-> {ok, channel()}
| {shutdown, Reason :: term(), channel()}).
handle_in(Data, Channel) ->
Req = #{bytes => Data},
{ok, try_dispatch(on_received_bytes, wrap(Req), Channel)}.
case cb_received(Data, Channel) of
{ok, NChannel} ->
{ok, NChannel};
{error, Reason} ->
{shutdown, Reason, Channel}
end.
-spec(handle_deliver(list(emqx_types:deliver()), channel())
-> {ok, channel()}
| {shutdown, Reason :: term(), channel()}).
handle_deliver(Delivers, Channel = #channel{clientinfo = ClientInfo}) ->
%% XXX: ?? Nack delivers from shared subscriptions
Mountpoint = maps:get(mountpoint, ClientInfo),
NodeStr = atom_to_binary(node(), utf8),
Msgs = lists:map(fun({_, _, Msg}) ->
ok = emqx_metrics:inc('messages.delivered'),
Msg1 = emqx_hooks:run_fold('message.delivered',
[ClientInfo], Msg),
NMsg = emqx_mountpoint:unmount(Mountpoint, Msg1),
#{node => NodeStr,
id => hexstr(emqx_message:id(NMsg)),
qos => emqx_message:qos(NMsg),
from => fmt_from(emqx_message:from(NMsg)),
topic => emqx_message:topic(NMsg),
payload => emqx_message:payload(NMsg),
timestamp => emqx_message:timestamp(NMsg)
}
end, Delivers),
Req = #{messages => Msgs},
{ok, try_dispatch(on_received_messages, wrap(Req), Channel)}.
handle_deliver(Delivers, Channel) ->
%% TODO: ?? Nack delivers from shared subscriptions
case cb_deliver(Delivers, Channel) of
{ok, NChannel} ->
{ok, NChannel};
{error, Reason} ->
{shutdown, Reason, Channel}
end.
-spec(handle_timeout(reference(), Msg :: term(), channel())
-> {ok, channel()}
| {shutdown, Reason :: term(), channel()}).
handle_timeout(_TRef, {keepalive, _StatVal},
Channel = #channel{keepalive = undefined}) ->
{ok, Channel};
handle_timeout(_TRef, {keepalive, StatVal},
Channel = #channel{keepalive = Keepalive}) ->
case emqx_keepalive:check(StatVal, Keepalive) of
{ok, NKeepalive} ->
NChannel = Channel#channel{keepalive = NKeepalive},
{ok, reset_timer(alive_timer, NChannel)};
{error, timeout} ->
Req = #{type => 'KEEPALIVE'},
{ok, try_dispatch(on_timer_timeout, wrap(Req), Channel)}
end;
handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) ->
{shutdown, {error, {force_close, Reason}}, Channel};
handle_timeout(_TRef, Msg, Channel) ->
?WARN("Unexpected timeout: ~p", [Msg]),
{ok, Channel}.
-spec(handle_call(any(), channel())
-> {reply, Reply :: term(), channel()}
| {reply, Reply :: term(), replies(), channel()}
| {shutdown, Reason :: term(), Reply :: term(), channel()}).
handle_call({send, Data}, Channel) ->
{reply, ok, [{outgoing, Data}], Channel};
handle_call(close, Channel = #channel{conn_state = connected}) ->
{reply, ok, [{event, disconnected}, {close, normal}], Channel};
handle_call(close, Channel) ->
{reply, ok, [{close, normal}], Channel};
handle_call({auth, ClientInfo, _Password}, Channel = #channel{conn_state = connected}) ->
?LOG(warning, "Duplicated authorized command, dropped ~p", [ClientInfo]),
{ok, {error, ?RESP_PERMISSION_DENY, <<"Duplicated authenticate command">>}, Channel};
handle_call({auth, ClientInfo0, Password},
Channel = #channel{conninfo = ConnInfo,
clientinfo = ClientInfo}) ->
ClientInfo1 = enrich_clientinfo(ClientInfo0, ClientInfo),
NConnInfo = enrich_conninfo(ClientInfo1, ConnInfo),
Channel1 = Channel#channel{conninfo = NConnInfo,
clientinfo = ClientInfo1},
#{clientid := ClientId, username := Username} = ClientInfo1,
case emqx_access_control:authenticate(ClientInfo1#{password => Password}) of
{ok, AuthResult} ->
emqx_logger:set_metadata_clientid(ClientId),
is_anonymous(AuthResult) andalso
emqx_metrics:inc('client.auth.anonymous'),
NClientInfo = maps:merge(ClientInfo1, AuthResult),
NChannel = Channel1#channel{clientinfo = NClientInfo},
case emqx_cm:open_session(true, NClientInfo, NConnInfo) of
{ok, _Session} ->
?LOG(debug, "Client ~s (Username: '~s') authorized successfully!",
[ClientId, Username]),
{reply, ok, [{event, connected}], ensure_connected(NChannel)};
{error, Reason} ->
?LOG(warning, "Client ~s (Username: '~s') open session failed for ~0p",
[ClientId, Username, Reason]),
{reply, {error, ?RESP_PERMISSION_DENY, Reason}, Channel}
end;
{error, Reason} ->
?LOG(warning, "Client ~s (Username: '~s') login failed for ~0p",
[ClientId, Username, Reason]),
{reply, {error, ?RESP_PERMISSION_DENY, Reason}, Channel}
end;
handle_call({start_timer, keepalive, Interval},
Channel = #channel{
conninfo = ConnInfo,
clientinfo = ClientInfo
}) ->
NConnInfo = ConnInfo#{keepalive => Interval},
NClientInfo = ClientInfo#{keepalive => Interval},
NChannel = Channel#channel{conninfo = NConnInfo, clientinfo = NClientInfo},
{reply, ok, ensure_keepalive(NChannel)};
handle_call({subscribe, TopicFilter, Qos},
Channel = #channel{
conn_state = connected,
clientinfo = ClientInfo}) ->
case is_acl_enabled(ClientInfo) andalso
emqx_access_control:check_acl(ClientInfo, subscribe, TopicFilter) of
deny ->
{reply, {error, ?RESP_PERMISSION_DENY, <<"ACL deny">>}, Channel};
_ ->
{ok, NChannel} = do_subscribe([{TopicFilter, #{qos => Qos}}], Channel),
{reply, ok, NChannel}
end;
handle_call({unsubscribe, TopicFilter},
Channel = #channel{conn_state = connected}) ->
{ok, NChannel} = do_unsubscribe([{TopicFilter, #{}}], Channel),
{reply, ok, NChannel};
handle_call({publish, Topic, Qos, Payload},
Channel = #channel{
conn_state = connected,
clientinfo = ClientInfo
= #{clientid := From,
mountpoint := Mountpoint}}) ->
case is_acl_enabled(ClientInfo) andalso
emqx_access_control:check_acl(ClientInfo, publish, Topic) of
deny ->
{reply, {error, ?RESP_PERMISSION_DENY, <<"ACL deny">>}, Channel};
_ ->
Msg = emqx_message:make(From, Qos, Topic, Payload),
NMsg = emqx_mountpoint:mount(Mountpoint, Msg),
emqx:publish(NMsg),
{reply, ok, Channel}
end;
handle_call(kick, Channel) ->
{shutdown, kicked, ok, Channel};
handle_call(Req, Channel) ->
?LOG(warning, "Unexpected call: ~p", [Req]),
{reply, {error, unexpected_call}, Channel}.
?WARN("Unexpected call: ~p", [Req]),
{reply, ok, Channel}.
-spec(handle_cast(any(), channel())
-> {ok, channel()}
| {ok, replies(), channel()}
| {shutdown, Reason :: term(), channel()}).
handle_cast({send, Data}, Channel) ->
{ok, [{outgoing, Data}], Channel};
handle_cast(close, Channel) ->
{ok, [{close, normal}], Channel};
handle_cast({register, ClientInfo}, Channel = #channel{registered = true}) ->
?WARN("Duplicated register command, dropped ~p", [ClientInfo]),
{ok, Channel};
handle_cast({register, ClientInfo0}, Channel = #channel{conninfo = ConnInfo,
clientinfo = ClientInfo}) ->
ClientInfo1 = maybe_assign_clientid(ClientInfo0),
NConnInfo = enrich_conninfo(ClientInfo1, ConnInfo),
NClientInfo = enrich_clientinfo(ClientInfo1, ClientInfo),
case emqx_cm:open_session(true, NClientInfo, NConnInfo) of
{ok, _Session} ->
NChannel = Channel#channel{registered = true,
conninfo = NConnInfo,
clientinfo = NClientInfo},
{ok, [{event, registered}], NChannel};
{error, Reason} ->
?ERROR("Register failed, reason: ~p", [Reason]),
{shutdown, Reason, {error, Reason}, Channel}
end;
handle_cast({subscribe, TopicFilter, Qos}, Channel) ->
do_subscribe([{TopicFilter, #{qos => Qos}}], Channel);
handle_cast({unsubscribe, TopicFilter}, Channel) ->
do_unsubscribe([{TopicFilter, #{}}], Channel);
handle_cast({publish, Msg}, Channel) ->
emqx:publish(enrich_msg(Msg, Channel)),
{ok, Channel};
handle_cast(Req, Channel) ->
?WARN("Unexpected call: ~p", [Req]),
{ok, Channel}.
@ -359,41 +241,15 @@ handle_info({subscribe, TopicFilters}, Channel) ->
handle_info({unsubscribe, TopicFilters}, Channel) ->
do_unsubscribe(TopicFilters, Channel);
handle_info({sock_closed, Reason},
Channel = #channel{rqueue = Queue, inflight = Inflight}) ->
case queue:len(Queue) =:= 0
andalso Inflight =:= undefined of
true ->
{shutdown, {sock_closed, Reason}, Channel};
_ ->
%% delayed close process for flushing all callback funcs to gRPC server
Channel1 = Channel#channel{closed_reason = {sock_closed, Reason}},
Channel2 = ensure_timer(force_timer, Channel1),
{ok, ensure_disconnected({sock_closed, Reason}, Channel2)}
end;
handle_info({hreply, on_socket_created, {ok, _}}, Channel) ->
dispatch_or_close_process(Channel#channel{inflight = undefined});
handle_info({hreply, FunName, {ok, _}}, Channel)
when FunName == on_socket_closed;
FunName == on_received_bytes;
FunName == on_received_messages;
FunName == on_timer_timeout ->
dispatch_or_close_process(Channel#channel{inflight = undefined});
handle_info({hreply, FunName, {error, Reason}}, Channel) ->
{shutdown, {error, {FunName, Reason}}, Channel};
handle_info({sock_closed, Reason}, Channel) ->
{shutdown, {sock_closed, Reason}, Channel};
handle_info(Info, Channel) ->
?LOG(warning, "Unexpected info: ~p", [Info]),
?WARN("Unexpected info: ~p", [Info]),
{ok, Channel}.
-spec(terminate(any(), channel()) -> channel()).
-spec(terminate(any(), channel()) -> ok).
terminate(Reason, Channel) ->
Req = #{reason => stringfy(Reason)},
try_dispatch(on_socket_closed, wrap(Req), Channel).
is_anonymous(#{anonymous := true}) -> true;
is_anonymous(_AuthResult) -> false.
cb_terminated(Reason, Channel), ok.
%%--------------------------------------------------------------------
%% Sub/UnSub
@ -410,22 +266,11 @@ do_subscribe(TopicFilters, Channel) ->
do_subscribe(TopicFilter, SubOpts, Channel =
#channel{clientinfo = ClientInfo = #{mountpoint := Mountpoint},
subscriptions = Subs}) ->
%% Mountpoint first
NTopicFilter = emqx_mountpoint:mount(Mountpoint, TopicFilter),
NSubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts),
SubId = maps:get(clientid, ClientInfo, undefined),
IsNew = not maps:is_key(NTopicFilter, Subs),
case IsNew of
true ->
ok = emqx:subscribe(NTopicFilter, SubId, NSubOpts),
ok = emqx_hooks:run('session.subscribed',
[ClientInfo, NTopicFilter, NSubOpts#{is_new => IsNew}]),
Channel#channel{subscriptions = Subs#{NTopicFilter => NSubOpts}};
_ ->
%% Update subopts
ok = emqx:subscribe(NTopicFilter, SubId, NSubOpts),
Channel#channel{subscriptions = Subs#{NTopicFilter => NSubOpts}}
end.
_ = emqx:subscribe(NTopicFilter, SubId, NSubOpts),
Channel#channel{subscriptions = Subs#{NTopicFilter => SubOpts}}.
do_unsubscribe(TopicFilters, Channel) ->
NChannel = lists:foldl(
@ -435,133 +280,74 @@ do_unsubscribe(TopicFilters, Channel) ->
{ok, NChannel}.
%% @private
do_unsubscribe(TopicFilter, UnSubOpts, Channel =
#channel{clientinfo = ClientInfo = #{mountpoint := Mountpoint},
do_unsubscribe(TopicFilter, _SubOpts, Channel =
#channel{clientinfo = #{mountpoint := Mountpoint},
subscriptions = Subs}) ->
NTopicFilter = emqx_mountpoint:mount(Mountpoint, TopicFilter),
case maps:find(NTopicFilter, Subs) of
{ok, SubOpts} ->
ok = emqx:unsubscribe(NTopicFilter),
ok = emqx_hooks:run('session.unsubscribed',
[ClientInfo, TopicFilter, maps:merge(SubOpts, UnSubOpts)]),
Channel#channel{subscriptions = maps:remove(NTopicFilter, Subs)};
_ ->
Channel
end.
TopicFilter1 = emqx_mountpoint:mount(Mountpoint, TopicFilter),
_ = emqx:unsubscribe(TopicFilter1),
Channel#channel{subscriptions = maps:remove(TopicFilter1, Subs)}.
%% @private
parse_topic_filters(TopicFilters) ->
lists:map(fun emqx_topic:parse/1, TopicFilters).
-compile({inline, [is_acl_enabled/1]}).
is_acl_enabled(#{zone := Zone, is_superuser := IsSuperuser}) ->
(not IsSuperuser) andalso emqx_zone:enable_acl(Zone).
%%--------------------------------------------------------------------
%% Ensure & Hooks
%% Cbs for driver
%%--------------------------------------------------------------------
ensure_connected(Channel = #channel{conninfo = ConnInfo,
clientinfo = ClientInfo}) ->
NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
ok = run_hooks('client.connected', [ClientInfo, NConnInfo]),
Channel#channel{conninfo = NConnInfo,
conn_state = connected
}.
cb_init(ConnInfo, Driver) ->
Args = [self(), emqx_exproto_types:serialize(conninfo, ConnInfo)],
emqx_exproto_driver_mngr:call(Driver, {'init', Args}).
ensure_disconnected(Reason, Channel = #channel{
conn_state = connected,
conninfo = ConnInfo,
clientinfo = ClientInfo}) ->
NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)},
ok = run_hooks('client.disconnected', [ClientInfo, Reason, NConnInfo]),
Channel#channel{conninfo = NConnInfo, conn_state = disconnected};
cb_received(Data, Channel = #channel{state = DState}) ->
Args = [self(), Data, DState],
do_call_cb('received', Args, Channel).
ensure_disconnected(_Reason, Channel = #channel{conninfo = ConnInfo}) ->
NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)},
Channel#channel{conninfo = NConnInfo, conn_state = disconnected}.
cb_terminated(Reason, Channel = #channel{state = DState}) ->
Args = [self(), stringfy(Reason), DState],
do_call_cb('terminated', Args, Channel).
run_hooks(Name, Args) ->
ok = emqx_metrics:inc(Name), emqx_hooks:run(Name, Args).
cb_deliver(Delivers, Channel = #channel{state = DState}) ->
Msgs = [emqx_exproto_types:serialize(message, Msg) || {_, _, Msg} <- Delivers],
Args = [self(), Msgs, DState],
do_call_cb('deliver', Args, Channel).
%%--------------------------------------------------------------------
%% Enrich Keepalive
ensure_keepalive(Channel = #channel{clientinfo = ClientInfo}) ->
ensure_keepalive_timer(maps:get(keepalive, ClientInfo, 0), Channel).
ensure_keepalive_timer(Interval, Channel) when Interval =< 0 ->
Channel;
ensure_keepalive_timer(Interval, Channel) ->
Keepalive = emqx_keepalive:init(timer:seconds(Interval)),
ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}).
ensure_timer(Name, Channel = #channel{timers = Timers}) ->
TRef = maps:get(Name, Timers, undefined),
Time = interval(Name, Channel),
case TRef == undefined andalso Time > 0 of
true -> ensure_timer(Name, Time, Channel);
false -> Channel %% Timer disabled or exists
%% @private
do_call_cb(Fun, Args, Channel = #channel{driver = D}) ->
case emqx_exproto_driver_mngr:call(D, {Fun, Args}) of
ok ->
{ok, Channel};
{ok, NDState} ->
{ok, Channel#channel{state = NDState}};
{error, Reason} ->
{error, Reason}
end.
ensure_timer(Name, Time, Channel = #channel{timers = Timers}) ->
Msg = maps:get(Name, ?TIMER_TABLE),
TRef = emqx_misc:start_timer(Time, Msg),
Channel#channel{timers = Timers#{Name => TRef}}.
reset_timer(Name, Channel) ->
ensure_timer(Name, clean_timer(Name, Channel)).
clean_timer(Name, Channel = #channel{timers = Timers}) ->
Channel#channel{timers = maps:remove(Name, Timers)}.
interval(force_timer, _) ->
15000;
interval(alive_timer, #channel{keepalive = Keepalive}) ->
emqx_keepalive:info(interval, Keepalive).
%%--------------------------------------------------------------------
%% Dispatch
%%--------------------------------------------------------------------
wrap(Req) ->
Req#{conn => pid_to_list(self())}.
dispatch_or_close_process(Channel = #channel{
rqueue = Queue,
inflight = undefined,
gcli = GClient}) ->
case queue:out(Queue) of
{empty, _} ->
case Channel#channel.conn_state of
disconnected ->
{shutdown, Channel#channel.closed_reason, Channel};
_ ->
{ok, Channel}
end;
{{value, {FunName, Req}}, NQueue} ->
emqx_exproto_gcli:async_call(FunName, Req, GClient),
{ok, Channel#channel{inflight = FunName, rqueue = NQueue}}
end.
try_dispatch(FunName, Req, Channel = #channel{inflight = undefined, gcli = GClient}) ->
emqx_exproto_gcli:async_call(FunName, Req, GClient),
Channel#channel{inflight = FunName};
try_dispatch(FunName, Req, Channel = #channel{rqueue = Queue}) ->
Channel#channel{rqueue = queue:in({FunName, Req}, Queue)}.
%%--------------------------------------------------------------------
%% Format
%%--------------------------------------------------------------------
maybe_assign_clientid(ClientInfo) ->
case maps:get(clientid, ClientInfo, undefined) of
undefined ->
ClientInfo#{clientid => emqx_guid:to_base62(emqx_guid:gen())};
_ ->
ClientInfo
end.
enrich_msg(Msg, #channel{clientinfo = ClientInfo = #{mountpoint := Mountpoint}}) ->
NMsg = emqx_mountpoint:mount(Mountpoint, Msg),
case maps:get(clientid, ClientInfo, undefined) of
undefined -> NMsg;
ClientId -> NMsg#message{from = ClientId}
end.
enrich_conninfo(InClientInfo, ConnInfo) ->
Ks = [proto_name, proto_ver, clientid, username],
maps:merge(ConnInfo, maps:with(Ks, InClientInfo)).
maps:merge(ConnInfo, maps:with([proto_name, proto_ver, clientid, username, keepalive], InClientInfo)).
enrich_clientinfo(InClientInfo = #{proto_name := ProtoName}, ClientInfo) ->
Ks = [clientid, username, mountpoint],
NClientInfo = maps:merge(ClientInfo, maps:with(Ks, InClientInfo)),
NClientInfo#{protocol => ProtoName}.
NClientInfo = maps:merge(ClientInfo, maps:with([clientid, username, mountpoint], InClientInfo)),
NClientInfo#{protocol => lowcase_atom(ProtoName)}.
default_conninfo(ConnInfo) ->
ConnInfo#{proto_name => undefined,
@ -577,12 +363,12 @@ default_conninfo(ConnInfo) ->
expiry_interval => 0}.
default_clientinfo(#{peername := {PeerHost, _},
sockname := {_, SockPort}}) ->
#{zone => external,
sockname := {_, SockPort}}) ->
#{zone => undefined,
protocol => undefined,
peerhost => PeerHost,
sockport => SockPort,
clientid => undefined,
clientid => default_clientid(),
username => undefined,
is_bridge => false,
is_superuser => false,
@ -591,9 +377,10 @@ default_clientinfo(#{peername := {PeerHost, _},
stringfy(Reason) ->
unicode:characters_to_binary((io_lib:format("~0p", [Reason]))).
hexstr(Bin) ->
[io_lib:format("~2.16.0B",[X]) || <<X:8>> <= Bin].
lowcase_atom(undefined) ->
undefined;
lowcase_atom(S) ->
binary_to_atom(string:lowercase(S), utf8).
fmt_from(undefined) -> <<>>;
fmt_from(Bin) when is_binary(Bin) -> Bin;
fmt_from(T) -> stringfy(T).
default_clientid() ->
<<"exproto_client_", (list_to_binary(pid_to_list(self())))/binary>>.

View File

@ -173,10 +173,8 @@ esockd_wait({esockd_transport, Sock}) ->
R = {error, _} -> R
end.
esockd_close({udp, _SockPid, _Sock}) ->
%% nothing to do for udp socket
%%gen_udp:close(Sock);
ok;
esockd_close({udp, _SockPid, Sock}) ->
gen_udp:close(Sock);
esockd_close({esockd_transport, Sock}) ->
esockd_transport:fast_close(Sock).
@ -359,9 +357,6 @@ handle_msg({'$gen_call', From, Req}, State) ->
{reply, Reply, NState} ->
gen_server:reply(From, Reply),
{ok, NState};
{reply, Reply, Msgs, NState} ->
gen_server:reply(From, Reply),
{ok, next_msgs(Msgs), NState};
{stop, Reason, Reply, NState} ->
gen_server:reply(From, Reply),
stop(Reason, NState)
@ -424,16 +419,16 @@ handle_msg({close, Reason}, State) ->
?LOG(debug, "Force to close the socket due to ~p", [Reason]),
handle_info({sock_closed, Reason}, close_socket(State));
handle_msg({event, connected}, State = #state{channel = Channel}) ->
handle_msg({event, registered}, State = #state{channel = Channel}) ->
ClientId = emqx_exproto_channel:info(clientid, Channel),
emqx_cm:register_channel(ClientId, info(State), stats(State));
handle_msg({event, disconnected}, State = #state{channel = Channel}) ->
ClientId = emqx_exproto_channel:info(clientid, Channel),
emqx_cm:set_chan_info(ClientId, info(State)),
emqx_cm:connection_closed(ClientId),
{ok, State};
%handle_msg({event, disconnected}, State = #state{channel = Channel}) ->
% ClientId = emqx_exproto_channel:info(clientid, Channel),
% emqx_cm:set_chan_info(ClientId, info(State)),
% emqx_cm:connection_closed(ClientId),
% {ok, State};
%
%handle_msg({event, _Other}, State = #state{channel = Channel}) ->
% ClientId = emqx_exproto_channel:info(clientid, Channel),
% emqx_cm:set_chan_info(ClientId, info(State)),
@ -485,8 +480,6 @@ handle_call(_From, Req, State = #state{channel = Channel}) ->
case emqx_exproto_channel:handle_call(Req, Channel) of
{reply, Reply, NChannel} ->
{reply, Reply, State#state{channel = NChannel}};
{reply, Reply, Replies, NChannel} ->
{reply, Reply, Replies, State#state{channel = NChannel}};
{shutdown, Reason, Reply, NChannel} ->
shutdown(Reason, Reply, State#state{channel = NChannel})
end.
@ -502,18 +495,7 @@ handle_timeout(_TRef, limit_timeout, State) ->
limit_timer = undefined
},
handle_info(activate_socket, NState);
handle_timeout(TRef, keepalive, State = #state{socket = Socket,
channel = Channel})->
case emqx_exproto_channel:info(conn_state, Channel) of
disconnected -> {ok, State};
_ ->
case esockd_getstat(Socket, [recv_oct]) of
{ok, [{recv_oct, RecvOct}]} ->
handle_timeout(TRef, {keepalive, RecvOct}, State);
{error, Reason} ->
handle_info({sock_error, Reason}, State)
end
end;
handle_timeout(_TRef, emit_stats, State =
#state{channel = Channel}) ->
ClientId = emqx_exproto_channel:info(clientid, Channel),
@ -683,3 +665,4 @@ stop(Reason, State) ->
stop(Reason, Reply, State) ->
{stop, Reason, Reply, State}.

View File

@ -1,110 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% the gRPC client worker for ConnectionHandler service
-module(emqx_exproto_gcli).
-behaviour(gen_server).
-include_lib("emqx/include/logger.hrl").
-logger_header("[ExProto gClient]").
%% APIs
-export([async_call/3]).
-export([start_link/2]).
%% gen_server callbacks
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-define(CONN_ADAPTER_MOD, emqx_exproto_v_1_connection_handler_client).
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
start_link(Pool, Id) ->
gen_server:start_link({local, emqx_misc:proc_name(?MODULE, Id)},
?MODULE, [Pool, Id], []).
async_call(FunName, Req = #{conn := Conn}, Options) ->
cast(pick(Conn), {rpc, FunName, Req, Options, self()}).
%%--------------------------------------------------------------------
%% cast, pick
%%--------------------------------------------------------------------
-compile({inline, [cast/2, pick/1]}).
cast(Deliver, Msg) ->
gen_server:cast(Deliver, Msg).
pick(Conn) ->
gproc_pool:pick_worker(exproto_gcli_pool, Conn).
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([Pool, Id]) ->
true = gproc_pool:connect_worker(Pool, {Pool, Id}),
{ok, #{pool => Pool, id => Id}}.
handle_call(_Request, _From, State) ->
{reply, ok, State}.
handle_cast({rpc, Fun, Req, Options, From}, State) ->
case catch apply(?CONN_ADAPTER_MOD, Fun, [Req, Options]) of
{ok, Resp, _Metadata} ->
?LOG(debug, "~p got {ok, ~0p, ~0p}", [Fun, Resp, _Metadata]),
reply(From, Fun, {ok, Resp});
{error, {Code, Msg}, _Metadata} ->
?LOG(error, "CALL ~0p:~0p(~0p, ~0p) response errcode: ~0p, errmsg: ~0p",
[?CONN_ADAPTER_MOD, Fun, Req, Options, Code, Msg]),
reply(From, Fun, {error, {Code, Msg}});
{error, Reason} ->
?LOG(error, "CALL ~0p:~0p(~0p, ~0p) error: ~0p",
[?CONN_ADAPTER_MOD, Fun, Req, Options, Reason]),
reply(From, Fun, {error, Reason});
{'EXIT', Reason, Stk} ->
?LOG(error, "CALL ~0p:~0p(~0p, ~0p) throw an exception: ~0p, stacktrace: ~p",
[?CONN_ADAPTER_MOD, Fun, Req, Options, Reason, Stk]),
reply(From, Fun, {error, Reason})
end,
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
reply(Pid, Fun, Result) ->
Pid ! {hreply, Fun, Result}.

View File

@ -1,154 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% The gRPC server for ConnectionAdapter
-module(emqx_exproto_gsvr).
-behavior(emqx_exproto_v_1_connection_adapter_bhvr).
-include("emqx_exproto.hrl").
-include_lib("emqx/include/logger.hrl").
-logger_header("[ExProto gServer]").
-define(IS_QOS(X), (X =:= 0 orelse X =:= 1 orelse X =:= 2)).
%% gRPC server callbacks
-export([ send/2
, close/2
, authenticate/2
, start_timer/2
, publish/2
, subscribe/2
, unsubscribe/2
]).
%%--------------------------------------------------------------------
%% gRPC ConnectionAdapter service
%%--------------------------------------------------------------------
-spec send(ctx:ctx(), emqx_exproto_pb:send_bytes_request())
-> {ok, emqx_exproto_pb:code_response(), ctx:ctx()}
| grpcbox_stream:grpc_error_response().
send(Ctx, Req = #{conn := Conn, bytes := Bytes}) ->
?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]),
{ok, response(call(Conn, {send, Bytes})), Ctx}.
-spec close(ctx:ctx(), emqx_exproto_pb:close_socket_request())
-> {ok, emqx_exproto_pb:code_response(), ctx:ctx()}
| grpcbox_stream:grpc_error_response().
close(Ctx, Req = #{conn := Conn}) ->
?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]),
{ok, response(call(Conn, close)), Ctx}.
-spec authenticate(ctx:ctx(), emqx_exproto_pb:authenticate_request())
-> {ok, emqx_exproto_pb:code_response(), ctx:ctx()}
| grpcbox_stream:grpc_error_response().
authenticate(Ctx, Req = #{conn := Conn,
password := Password,
clientinfo := ClientInfo}) ->
?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]),
case validate(clientinfo, ClientInfo) of
false ->
{ok, response({error, ?RESP_REQUIRED_PARAMS_MISSED}), Ctx};
_ ->
{ok, response(call(Conn, {auth, ClientInfo, Password})), Ctx}
end.
-spec start_timer(ctx:ctx(), emqx_exproto_pb:publish_request())
-> {ok, emqx_exproto_pb:code_response(), ctx:ctx()}
| grpcbox_stream:grpc_error_response().
start_timer(Ctx, Req = #{conn := Conn, type := Type, interval := Interval})
when Type =:= 'KEEPALIVE' andalso Interval > 0 ->
?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]),
{ok, response(call(Conn, {start_timer, keepalive, Interval})), Ctx};
start_timer(Ctx, Req) ->
?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]),
{ok, response({error, ?RESP_PARAMS_TYPE_ERROR}), Ctx}.
-spec publish(ctx:ctx(), emqx_exproto_pb:publish_request())
-> {ok, emqx_exproto_pb:code_response(), ctx:ctx()}
| grpcbox_stream:grpc_error_response().
publish(Ctx, Req = #{conn := Conn, topic := Topic, qos := Qos, payload := Payload})
when ?IS_QOS(Qos) ->
?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]),
{ok, response(call(Conn, {publish, Topic, Qos, Payload})), Ctx};
publish(Ctx, Req) ->
?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]),
{ok, response({error, ?RESP_PARAMS_TYPE_ERROR}), Ctx}.
-spec subscribe(ctx:ctx(), emqx_exproto_pb:subscribe_request())
-> {ok, emqx_exproto_pb:code_response(), ctx:ctx()}
| grpcbox_stream:grpc_error_response().
subscribe(Ctx, Req = #{conn := Conn, topic := Topic, qos := Qos})
when ?IS_QOS(Qos) ->
?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]),
{ok, response(call(Conn, {subscribe, Topic, Qos})), Ctx};
subscribe(Ctx, Req) ->
?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]),
{ok, response({error, ?RESP_PARAMS_TYPE_ERROR}), Ctx}.
-spec unsubscribe(ctx:ctx(), emqx_exproto_pb:unsubscribe_request())
-> {ok, emqx_exproto_pb:code_response(), ctx:ctx()}
| grpcbox_stream:grpc_error_response().
unsubscribe(Ctx, Req = #{conn := Conn, topic := Topic}) ->
?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]),
{ok, response(call(Conn, {unsubscribe, Topic})), Ctx}.
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
to_pid(ConnStr) ->
list_to_pid(binary_to_list(ConnStr)).
call(ConnStr, Req) ->
case catch to_pid(ConnStr) of
{'EXIT', {badarg, _}} ->
{error, ?RESP_PARAMS_TYPE_ERROR,
<<"The conn type error">>};
Pid when is_pid(Pid) ->
case erlang:is_process_alive(Pid) of
true ->
emqx_exproto_conn:call(Pid, Req);
false ->
{error, ?RESP_CONN_PROCESS_NOT_ALIVE,
<<"Connection process is not alive">>}
end
end.
%%--------------------------------------------------------------------
%% Data types
stringfy(Reason) ->
unicode:characters_to_binary((io_lib:format("~0p", [Reason]))).
validate(clientinfo, M) ->
Required = [proto_name, proto_ver, clientid],
lists:all(fun(K) -> maps:is_key(K, M) end, Required).
response(ok) ->
#{code => ?RESP_SUCCESS};
response({error, Code, Reason})
when ?IS_GRPC_RESULT_CODE(Code) ->
#{code => Code, message => stringfy(Reason)};
response({error, Code})
when ?IS_GRPC_RESULT_CODE(Code) ->
#{code => Code};
response(Other) ->
#{code => ?RESP_UNKNOWN, message => stringfy(Other)}.

View File

@ -20,67 +20,17 @@
-export([start_link/0]).
-export([ start_grpc_server/3
, stop_grpc_server/1
, start_grpc_client_channel/3
, stop_grpc_client_channel/1
]).
-export([init/1]).
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
-spec start_grpc_server(atom(), inet:port_number(), list())
-> {ok, pid()} | {error, term()}.
start_grpc_server(Name, Port, SSLOptions) ->
ServerOpts = #{},
GrpcOpts = #{service_protos => [emqx_exproto_pb],
services => #{'emqx.exproto.v1.ConnectionAdapter' => emqx_exproto_gsvr}},
ListenOpts = #{port => Port, socket_options => [{reuseaddr, true}]},
PoolOpts = #{size => 8},
TransportOpts = maps:from_list(SSLOptions),
Spec = #{id => Name,
start => {grpcbox_services_sup, start_link,
[ServerOpts, GrpcOpts, ListenOpts,
PoolOpts, TransportOpts]},
type => supervisor,
restart => permanent,
shutdown => infinity},
supervisor:start_child(?MODULE, Spec).
-spec stop_grpc_server(atom()) -> ok.
stop_grpc_server(Name) ->
ok = supervisor:terminate_child(?MODULE, Name),
ok = supervisor:delete_child(?MODULE, Name).
-spec start_grpc_client_channel(
atom(),
[grpcbox_channel:endpoint()],
grpcbox_channel:options()) -> {ok, pid()} | {error, term()}.
start_grpc_client_channel(Name, Endpoints, Options0) ->
Options = Options0#{sync_start => true},
Spec = #{id => Name,
start => {grpcbox_channel, start_link, [Name, Endpoints, Options]},
type => worker},
supervisor:start_child(?MODULE, Spec).
-spec stop_grpc_client_channel(atom()) -> ok.
stop_grpc_client_channel(Name) ->
ok = supervisor:terminate_child(?MODULE, Name),
ok = supervisor:delete_child(?MODULE, Name).
%%--------------------------------------------------------------------
%% Supervisor callbacks
%%--------------------------------------------------------------------
init([]) ->
%% gRPC Client Pool
PoolSize = emqx_vm:schedulers() * 2,
Pool = emqx_pool_sup:spec([exproto_gcli_pool, hash, PoolSize,
{emqx_exproto_gcli, start_link, []}]),
{ok, {{one_for_one, 10, 5}, [Pool]}}.
DriverMngr = #{id => driver_mngr,
start => {emqx_exproto_driver_mngr, start_link, []},
restart => permanent,
shutdown => 5000,
type => worker,
modules => [emqx_exproto_driver_mngr]},
{ok, {{one_for_all, 10, 5}, [DriverMngr]}}.

View File

@ -19,20 +19,7 @@
-compile(export_all).
-compile(nowarn_export_all).
-import(emqx_exproto_echo_svr,
[ frame_connect/2
, frame_connack/1
, frame_publish/3
, frame_puback/1
, frame_subscribe/2
, frame_suback/1
, frame_unsubscribe/1
, frame_unsuback/1
, frame_disconnect/0
]).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-define(TCPOPTS, [binary, {active, false}]).
-define(DTLSOPTS, [binary, {active, false}, {protocol, dtls}]).
@ -50,38 +37,48 @@ groups() ->
%% @private
metrics() ->
[tcp, ssl, udp, dtls].
[ list_to_atom(X ++ "_" ++ Y)
|| X <- ["python3", "java"], Y <- ["tcp", "ssl", "udp", "dtls"]].
init_per_group(GrpName, Cfg) ->
put(grpname, GrpName),
Svrs = emqx_exproto_echo_svr:start(),
init_per_group(GrpName, Config) ->
[Lang, LisType] = [list_to_atom(X) || X <- string:tokens(atom_to_list(GrpName), "_")],
put(grpname, {Lang, LisType}),
emqx_ct_helpers:start_apps([emqx_exproto], fun set_sepecial_cfg/1),
emqx_logger:set_log_level(debug),
[{servers, Svrs}, {listener_type, GrpName} | Cfg].
[{driver_type, Lang},
{listener_type, LisType} | Config].
end_per_group(_, Cfg) ->
emqx_ct_helpers:stop_apps([emqx_exproto]),
emqx_exproto_echo_svr:stop(proplists:get_value(servers, Cfg)).
end_per_group(_, _) ->
emqx_ct_helpers:stop_apps([emqx_exproto]).
set_sepecial_cfg(emqx_exproto) ->
LisType = get(grpname),
{Lang, LisType} = get(grpname),
Path = emqx_ct_helpers:deps_path(emqx_exproto, "example/"),
Listeners = application:get_env(emqx_exproto, listeners, []),
Driver = compile(Lang, Path),
SockOpts = socketopts(LisType),
UpgradeOpts = fun(Opts) ->
Opts2 = lists:keydelete(tcp_options, 1, Opts),
Opts1 = lists:keydelete(driver, 1, Opts),
Opts2 = lists:keydelete(tcp_options, 1, Opts1),
Opts3 = lists:keydelete(ssl_options, 1, Opts2),
Opts4 = lists:keydelete(udp_options, 1, Opts3),
Opts5 = lists:keydelete(dtls_options, 1, Opts4),
SockOpts ++ Opts5
Driver ++ SockOpts ++ Opts5
end,
NListeners = [{Proto, LisType, LisOn, UpgradeOpts(Opts)}
|| {Proto, _Type, LisOn, Opts} <- Listeners],
application:set_env(emqx_exproto, listeners, NListeners);
set_sepecial_cfg(emqx) ->
application:set_env(emqx, allow_anonymous, true),
application:set_env(emqx, enable_acl_cache, false),
set_sepecial_cfg(_App) ->
ok.
compile(java, Path) ->
ErlPortJar = emqx_ct_helpers:deps_path(erlport, "priv/java/_pkgs/erlport.jar"),
ct:pal(os:cmd(lists:concat(["cd ", Path, " && ",
"rm -rf Main.class State.class && ",
"javac -cp ", ErlPortJar, " Main.java"]))),
[{driver, [{type, java}, {path, Path}, {cbm, 'Main'}]}];
compile(python3, Path) ->
[{driver, [{type, python3}, {path, Path}, {cbm, main}]}].
%%--------------------------------------------------------------------
%% Tests cases
%%--------------------------------------------------------------------
@ -89,263 +86,24 @@ set_sepecial_cfg(emqx) ->
t_start_stop(_) ->
ok.
t_mountpoint_echo(Cfg) ->
t_echo(Cfg) ->
SockType = proplists:get_value(listener_type, Cfg),
Bin = rand_bytes(),
Sock = open(SockType),
Client = #{proto_name => <<"demo">>,
proto_ver => <<"v0.1">>,
clientid => <<"test_client_1">>,
mountpoint => <<"ct/">>
},
Password = <<"123456">>,
send(Sock, Bin),
ConnBin = frame_connect(Client, Password),
ConnAckBin = frame_connack(0),
send(Sock, ConnBin),
{ok, ConnAckBin} = recv(Sock, 5000),
SubBin = frame_subscribe(<<"t/#">>, 1),
SubAckBin = frame_suback(0),
send(Sock, SubBin),
{ok, SubAckBin} = recv(Sock, 5000),
emqx:publish(emqx_message:make(<<"ct/t/dn">>, <<"echo">>)),
PubBin1 = frame_publish(<<"t/dn">>, 0, <<"echo">>),
{ok, PubBin1} = recv(Sock, 5000),
PubBin2 = frame_publish(<<"t/up">>, 0, <<"echo">>),
PubAckBin = frame_puback(0),
emqx:subscribe(<<"ct/t/up">>),
send(Sock, PubBin2),
{ok, PubAckBin} = recv(Sock, 5000),
receive
{deliver, _, _} -> ok
after 1000 ->
error(echo_not_running)
end,
close(Sock).
t_auth_deny(Cfg) ->
SockType = proplists:get_value(listener_type, Cfg),
Sock = open(SockType),
Client = #{proto_name => <<"demo">>,
proto_ver => <<"v0.1">>,
clientid => <<"test_client_1">>
},
Password = <<"123456">>,
ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
ok = meck:expect(emqx_access_control, authenticate,
fun(_) -> {error, ?RC_NOT_AUTHORIZED} end),
ConnBin = frame_connect(Client, Password),
ConnAckBin = frame_connack(1),
send(Sock, ConnBin),
{ok, ConnAckBin} = recv(Sock, 5000),
SockType =/= udp andalso begin
{error, closed} = recv(Sock, 5000)
end,
meck:unload([emqx_access_control]).
t_acl_deny(Cfg) ->
SockType = proplists:get_value(listener_type, Cfg),
Sock = open(SockType),
Client = #{proto_name => <<"demo">>,
proto_ver => <<"v0.1">>,
clientid => <<"test_client_1">>
},
Password = <<"123456">>,
ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
ok = meck:expect(emqx_access_control, check_acl, fun(_, _, _) -> deny end),
ConnBin = frame_connect(Client, Password),
ConnAckBin = frame_connack(0),
send(Sock, ConnBin),
{ok, ConnAckBin} = recv(Sock, 5000),
SubBin = frame_subscribe(<<"t/#">>, 1),
SubAckBin = frame_suback(1),
send(Sock, SubBin),
{ok, SubAckBin} = recv(Sock, 5000),
{ok, Bin} = recv(Sock, byte_size(Bin), 5000),
%% pubsub echo
emqx:subscribe(<<"t/#">>),
emqx:publish(emqx_message:make(<<"t/dn">>, <<"echo">>)),
First = receive {_, _, X} -> X#message.payload end,
First = receive {_, _, Y} -> Y#message.payload end,
PubBin = frame_publish(<<"t/dn">>, 0, <<"echo">>),
PubBinFailedAck = frame_puback(1),
PubBinSuccesAck = frame_puback(0),
send(Sock, PubBin),
{ok, PubBinFailedAck} = recv(Sock, 5000),
meck:unload([emqx_access_control]),
send(Sock, PubBin),
{ok, PubBinSuccesAck} = recv(Sock, 5000),
close(Sock).
t_keepalive_timeout(Cfg) ->
SockType = proplists:get_value(listener_type, Cfg),
Sock = open(SockType),
Client = #{proto_name => <<"demo">>,
proto_ver => <<"v0.1">>,
clientid => <<"test_client_1">>,
keepalive => 2
},
Password = <<"123456">>,
ConnBin = frame_connect(Client, Password),
ConnAckBin = frame_connack(0),
send(Sock, ConnBin),
{ok, ConnAckBin} = recv(Sock, 5000),
DisconnectBin = frame_disconnect(),
{ok, DisconnectBin} = recv(Sock, 10000),
SockType =/= udp andalso begin
{error, closed} = recv(Sock, 5000)
end, ok.
t_hook_connected_disconnected(Cfg) ->
SockType = proplists:get_value(listener_type, Cfg),
Sock = open(SockType),
Client = #{proto_name => <<"demo">>,
proto_ver => <<"v0.1">>,
clientid => <<"test_client_1">>
},
Password = <<"123456">>,
ConnBin = frame_connect(Client, Password),
ConnAckBin = frame_connack(0),
Parent = self(),
HookFun1 = fun(_, _) -> Parent ! connected, ok end,
HookFun2 = fun(_, _, _) -> Parent ! disconnected, ok end,
emqx:hook('client.connected', HookFun1),
emqx:hook('client.disconnected', HookFun2),
send(Sock, ConnBin),
{ok, ConnAckBin} = recv(Sock, 5000),
receive
connected -> ok
after 1000 ->
error(hook_is_not_running)
end,
DisconnectBin = frame_disconnect(),
send(Sock, DisconnectBin),
receive
disconnected -> ok
after 1000 ->
error(hook_is_not_running)
end,
SockType =/= udp andalso begin
{error, closed} = recv(Sock, 5000)
end,
emqx:unhook('client.connected', HookFun1),
emqx:unhook('client.disconnected', HookFun2).
t_hook_session_subscribed_unsubscribed(Cfg) ->
SockType = proplists:get_value(listener_type, Cfg),
Sock = open(SockType),
Client = #{proto_name => <<"demo">>,
proto_ver => <<"v0.1">>,
clientid => <<"test_client_1">>
},
Password = <<"123456">>,
ConnBin = frame_connect(Client, Password),
ConnAckBin = frame_connack(0),
send(Sock, ConnBin),
{ok, ConnAckBin} = recv(Sock, 5000),
Parent = self(),
HookFun1 = fun(_, _, _) -> Parent ! subscribed, ok end,
HookFun2 = fun(_, _, _) -> Parent ! unsubscribed, ok end,
emqx:hook('session.subscribed', HookFun1),
emqx:hook('session.unsubscribed', HookFun2),
SubBin = frame_subscribe(<<"t/#">>, 1),
SubAckBin = frame_suback(0),
send(Sock, SubBin),
{ok, SubAckBin} = recv(Sock, 5000),
receive
subscribed -> ok
after 1000 ->
error(hook_is_not_running)
end,
UnsubBin = frame_unsubscribe(<<"t/#">>),
UnsubAckBin = frame_unsuback(0),
send(Sock, UnsubBin),
{ok, UnsubAckBin} = recv(Sock, 5000),
receive
unsubscribed -> ok
after 1000 ->
error(hook_is_not_running)
end,
close(Sock),
emqx:unhook('session.subscribed', HookFun1),
emqx:unhook('session.unsubscribed', HookFun2).
t_hook_message_delivered(Cfg) ->
SockType = proplists:get_value(listener_type, Cfg),
Sock = open(SockType),
Client = #{proto_name => <<"demo">>,
proto_ver => <<"v0.1">>,
clientid => <<"test_client_1">>
},
Password = <<"123456">>,
ConnBin = frame_connect(Client, Password),
ConnAckBin = frame_connack(0),
send(Sock, ConnBin),
{ok, ConnAckBin} = recv(Sock, 5000),
SubBin = frame_subscribe(<<"t/#">>, 1),
SubAckBin = frame_suback(0),
send(Sock, SubBin),
{ok, SubAckBin} = recv(Sock, 5000),
HookFun1 = fun(_, Msg) -> {ok, Msg#message{payload = <<"2">>}} end,
emqx:hook('message.delivered', HookFun1),
emqx:publish(emqx_message:make(<<"t/dn">>, <<"1">>)),
PubBin1 = frame_publish(<<"t/dn">>, 0, <<"2">>),
{ok, PubBin1} = recv(Sock, 5000),
close(Sock),
emqx:unhook('message.delivered', HookFun1).
%%--------------------------------------------------------------------
%% Utils
@ -379,15 +137,15 @@ send({ssl, Sock}, Bin) ->
send({dtls, Sock}, Bin) ->
ssl:send(Sock, Bin).
recv({tcp, Sock}, Ts) ->
gen_tcp:recv(Sock, 0, Ts);
recv({udp, Sock}, Ts) ->
{ok, {_, _, Bin}} = gen_udp:recv(Sock, 0, Ts),
recv({tcp, Sock}, Size, Ts) ->
gen_tcp:recv(Sock, Size, Ts);
recv({udp, Sock}, Size, Ts) ->
{ok, {_, _, Bin}} = gen_udp:recv(Sock, Size, Ts),
{ok, Bin};
recv({ssl, Sock}, Ts) ->
ssl:recv(Sock, 0, Ts);
recv({dtls, Sock}, Ts) ->
ssl:recv(Sock, 0, Ts).
recv({ssl, Sock}, Size, Ts) ->
ssl:recv(Sock, Size, Ts);
recv({dtls, Sock}, Size, Ts) ->
ssl:recv(Sock, Size, Ts).
close({tcp, Sock}) ->
gen_tcp:close(Sock);

View File

@ -1,3 +1,3 @@
{deps,
[{luerl, {git, "https://github.com/emqx/luerl", {tag, "v0.3.1"}}}
]}.
]}.

View File

@ -1,3 +1,3 @@
{deps,
[{lwm2m_coap, {git, "https://github.com/emqx/lwm2m-coap", {tag, "v1.1.0"}}}
]}.
[{lwm2m_coap, {git, "https://github.com/emqx/lwm2m-coap", {tag, "v1.1.1"}}}
]}.

View File

@ -457,7 +457,9 @@ do_subscribe(ClientId, TopicTables) ->
end.
%%TODO: ???
publish(Msg) -> emqx:publish(Msg).
publish(Msg) ->
emqx_metrics:inc_msg(Msg),
emqx:publish(Msg).
unsubscribe(ClientId, Topic) ->
unsubscribe(ekka_mnesia:running_nodes(), ClientId, Topic).

View File

@ -70,6 +70,10 @@
, delete/2
]).
-export([ get_list_exported/0
, do_import/1
]).
export(_Bindings, _Params) ->
Rules = emqx_mgmt:export_rules(),
Resources = emqx_mgmt:export_resources(),
@ -97,8 +101,11 @@ export(_Bindings, _Params) ->
{auth_username, AuthUsername},
{auth_mnesia, AuthMnesia},
{acl_mnesia, AclMnesia},
{schemas, Schemas}],
{schemas, Schemas}
],
Bin = emqx_json:encode(Data),
ok = filelib:ensure_dir(NFilename),
case file:write_file(NFilename, Bin) of
ok ->
case file:read_file_info(NFilename) of
@ -106,7 +113,9 @@ export(_Bindings, _Params) ->
CreatedAt = io_lib:format("~p-~p-~p ~p:~p:~p", [Y, M, D, H, MM, S]),
return({ok, [{filename, list_to_binary(Filename)},
{size, Size},
{created_at, list_to_binary(CreatedAt)}]});
{created_at, list_to_binary(CreatedAt)},
{node, node()}
]});
{error, Reason} ->
return({error, Reason})
end;
@ -115,66 +124,86 @@ export(_Bindings, _Params) ->
end.
list_exported(_Bindings, _Params) ->
List = [ rpc:call(Node, ?MODULE, get_list_exported, []) || Node <- ekka_mnesia:running_nodes() ],
NList = lists:map(fun({_, FileInfo}) -> FileInfo end, lists:keysort(1, lists:append(List))),
return({ok, NList}).
get_list_exported() ->
Dir = emqx:get_env(data_dir),
{ok, Files} = file:list_dir_all(Dir),
List = lists:foldl(fun(File, Acc) ->
case filename:extension(File) =:= ".json" of
true ->
FullFile = filename:join([Dir, File]),
case file:read_file_info(FullFile) of
{ok, #file_info{size = Size, ctime = CTime = {{Y, M, D}, {H, MM, S}}}} ->
CreatedAt = io_lib:format("~p-~p-~p ~p:~p:~p", [Y, M, D, H, MM, S]),
Seconds = calendar:datetime_to_gregorian_seconds(CTime),
[{Seconds, [{filename, list_to_binary(File)},
{size, Size},
{created_at, list_to_binary(CreatedAt)}]} | Acc];
{error, Reason} ->
logger:error("Read file info of ~s failed with: ~p", [File, Reason]),
Acc
end;
false ->
Acc
end
end, [], Files),
NList = lists:map(fun({_, FileInfo}) -> FileInfo end, lists:keysort(1, List)),
return({ok, NList}).
lists:foldl(
fun(File, Acc) ->
case filename:extension(File) =:= ".json" of
true ->
FullFile = filename:join([Dir, File]),
case file:read_file_info(FullFile) of
{ok, #file_info{size = Size, ctime = CTime = {{Y, M, D}, {H, MM, S}}}} ->
CreatedAt = io_lib:format("~p-~p-~p ~p:~p:~p", [Y, M, D, H, MM, S]),
Seconds = calendar:datetime_to_gregorian_seconds(CTime),
[{Seconds, [{filename, list_to_binary(File)},
{size, Size},
{created_at, list_to_binary(CreatedAt)},
{node, node()}
]} | Acc];
{error, Reason} ->
logger:error("Read file info of ~s failed with: ~p", [File, Reason]),
Acc
end;
false -> Acc
end
end, [], Files).
import(_Bindings, Params) ->
case proplists:get_value(<<"filename">>, Params) of
undefined ->
return({error, missing_required_params});
Filename ->
FullFilename = filename:join([emqx:get_env(data_dir), Filename]),
case file:read_file(FullFilename) of
{ok, Json} ->
Data = emqx_json:decode(Json, [return_maps]),
Version = emqx_mgmt:to_version(maps:get(<<"version">>, Data)),
case lists:member(Version, ?VERSIONS) of
true ->
try
emqx_mgmt:import_resources(maps:get(<<"resources">>, Data, [])),
emqx_mgmt:import_rules(maps:get(<<"rules">>, Data, [])),
emqx_mgmt:import_blacklist(maps:get(<<"blacklist">>, Data, [])),
emqx_mgmt:import_applications(maps:get(<<"apps">>, Data, [])),
emqx_mgmt:import_users(maps:get(<<"users">>, Data, [])),
emqx_mgmt:import_auth_clientid(maps:get(<<"auth_clientid">>, Data, [])),
emqx_mgmt:import_auth_username(maps:get(<<"auth_username">>, Data, [])),
emqx_mgmt:import_auth_mnesia(maps:get(<<"auth_mnesia">>, Data, [])),
emqx_mgmt:import_acl_mnesia(maps:get(<<"acl_mnesia">>, Data, [])),
emqx_mgmt:import_schemas(maps:get(<<"schemas">>, Data, [])),
logger:debug("The emqx data has been imported successfully"),
return()
catch Class:Reason:Stack ->
logger:error("The emqx data import failed: ~0p", [{Class,Reason,Stack}]),
return({error, import_failed})
end;
false ->
logger:error("Unsupported version: ~p", [Version]),
return({error, unsupported_version})
Result = case proplists:get_value(<<"node">>, Params) of
undefined -> do_import(Filename);
Node ->
case lists:member(Node,
[ erlang:atom_to_binary(N, utf8) || N <- ekka_mnesia:running_nodes() ]
) of
true -> rpc:call(erlang:binary_to_atom(Node, utf8), ?MODULE, do_import, [Filename]);
false -> return({error, no_existent_node})
end
end,
return(Result)
end.
do_import(Filename) ->
FullFilename = filename:join([emqx:get_env(data_dir), Filename]),
case file:read_file(FullFilename) of
{ok, Json} ->
Data = emqx_json:decode(Json, [return_maps]),
Version = emqx_mgmt:to_version(maps:get(<<"version">>, Data)),
case lists:member(Version, ?VERSIONS) of
true ->
try
emqx_mgmt:import_confs(maps:get(<<"configs">>, Data, []), maps:get(<<"listeners_state">>, Data, [])),
emqx_mgmt:import_resources(maps:get(<<"resources">>, Data, [])),
emqx_mgmt:import_rules(maps:get(<<"rules">>, Data, [])),
emqx_mgmt:import_blacklist(maps:get(<<"blacklist">>, Data, [])),
emqx_mgmt:import_applications(maps:get(<<"apps">>, Data, [])),
emqx_mgmt:import_users(maps:get(<<"users">>, Data, [])),
emqx_mgmt:import_modules(maps:get(<<"modules">>, Data, [])),
emqx_mgmt:import_auth_clientid(maps:get(<<"auth_clientid">>, Data, [])),
emqx_mgmt:import_auth_username(maps:get(<<"auth_username">>, Data, [])),
emqx_mgmt:import_auth_mnesia(maps:get(<<"auth_mnesia">>, Data, []), Version),
emqx_mgmt:import_acl_mnesia(maps:get(<<"acl_mnesia">>, Data, []), Version),
emqx_mgmt:import_schemas(maps:get(<<"schemas">>, Data, [])),
logger:debug("The emqx data has been imported successfully"),
ok
catch Class:Reason:Stack ->
logger:error("The emqx data import failed: ~0p", [{Class,Reason,Stack}]),
{error, import_failed}
end;
{error, Reason} ->
return({error, Reason})
end
false ->
logger:error("Unsupported version: ~p", [Version]),
{error, unsupported_version}
end;
{error, Reason} ->
{error, Reason}
end.
download(#{filename := Filename}, _Params) ->
@ -195,7 +224,7 @@ do_upload(_Bindings, #{<<"filename">> := Filename,
FullFilename = filename:join([emqx:get_env(data_dir), Filename]),
case file:write_file(FullFilename, Bin) of
ok ->
return();
return({ok, [{node, node()}]});
{error, Reason} ->
return({error, Reason})
end;
@ -214,4 +243,4 @@ delete(#{filename := Filename}, _Params) ->
return();
{error, Reason} ->
return({error, Reason})
end.
end.

View File

@ -585,6 +585,7 @@ data(["export"]) ->
{auth_mnesia, AuthMnesia},
{acl_mnesia, AclMnesia},
{schemas, Schemas}],
ok = filelib:ensure_dir(NFilename),
case file:write_file(NFilename, emqx_json:encode(Data)) of
ok ->
emqx_ctl:print("The emqx data has been successfully exported to ~s.~n", [NFilename]);

View File

@ -53,7 +53,8 @@ groups() ->
acl_cache,
pubsub,
routes_and_subscriptions,
stats]}].
stats,
data]}].
init_per_suite(Config) ->
emqx_ct_helpers:start_apps([emqx, emqx_management, emqx_reloader]),
@ -65,6 +66,36 @@ end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([emqx_reloader, emqx_management, emqx]),
ekka_mnesia:ensure_stopped().
init_per_testcase(data, Config) ->
ok = emqx_dashboard_admin:mnesia(boot),
application:ensure_all_started(emqx_dahboard),
ok = emqx_rule_registry:mnesia(boot),
application:ensure_all_started(emqx_rule_engine),
meck:new(emqx_sys, [passthrough, no_history]),
meck:expect(emqx_sys, version, 0,
fun() ->
Tag =os:cmd("git describe --abbrev=0 --tags") -- "\n",
re:replace(Tag, "[v|e]", "", [{return ,list}])
end),
Config;
init_per_testcase(_, Config) ->
Config.
stop_pre_testcase(data, _Config) ->
application:stop(emqx_dahboard),
application:stop(emqx_rule_engine),
application:stop(emqx_modules),
application:stop(emqx_schema_registry),
application:stop(emqx_conf),
meck:unload(emqx_sys),
ok;
stop_pre_testcase(_, _Config) ->
ok.
get(Key, ResponseBody) ->
maps:get(Key, jiffy:decode(list_to_binary(ResponseBody), [return_maps])).
@ -432,6 +463,10 @@ acl_cache(_) ->
ok = emqtt:disconnect(C1).
pubsub(_) ->
Qos1Received = emqx_metrics:val('messages.qos1.received'),
Qos2Received = emqx_metrics:val('messages.qos2.received'),
Received = emqx_metrics:val('messages.received'),
ClientId = <<"client1">>,
Options = #{clientid => ClientId,
proto_ver => 5},
@ -532,7 +567,11 @@ pubsub(_) ->
{ok, Data3} = request_api(post, api_path(["mqtt/unsubscribe_batch"]), [], auth_header_(), Body3),
loop(maps:get(<<"data">>, jiffy:decode(list_to_binary(Data3), [return_maps]))),
ok = emqtt:disconnect(C1).
ok = emqtt:disconnect(C1),
?assertEqual(2, emqx_metrics:val('messages.qos1.received') - Qos1Received),
?assertEqual(2, emqx_metrics:val('messages.qos2.received') - Qos2Received),
?assertEqual(4, emqx_metrics:val('messages.received') - Received).
loop([]) -> [];
@ -596,6 +635,17 @@ stats(_) ->
?assertEqual(<<"undefined">>, get(<<"message">>, Return)),
meck:unload(emqx_mgmt).
data(_) ->
{ok, Data} = request_api(post, api_path(["data","export"]), [], auth_header_(), [#{}]),
#{<<"filename">> := Filename, <<"node">> := Node} = emqx_ct_http:get_http_data(Data),
{ok, DataList} = request_api(get, api_path(["data","export"]), auth_header_()),
?assertEqual(true, lists:member(emqx_ct_http:get_http_data(Data), emqx_ct_http:get_http_data(DataList))),
?assertMatch({ok, _}, request_api(post, api_path(["data","import"]), [], auth_header_(), #{<<"filename">> => Filename, <<"node">> => Node})),
?assertMatch({ok, _}, request_api(post, api_path(["data","import"]), [], auth_header_(), #{<<"filename">> => Filename})),
ok.
request_api(Method, Url, Auth) ->
request_api(Method, Url, [], Auth, []).

View File

@ -1,7 +1,7 @@
{deps, [
{bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.0"}}}
]}.
{deps,
[{pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.3"}}}
]}.
{plugins, [
{pc, {git, "https://github.com/emqx/port_compiler.git", {tag, "v1.11.1"}}}
]}.
]}.

View File

@ -1,9 +1,9 @@
{application, emqx_passwd,
[{description, "Password Hash Library for EMQ X Broker"},
{vsn, "4.3.0"}, % strict semver, bump manually!
{vsn, "0.1.1"}, % strict semver, bump manually!
{modules, ['emqx_passwd']},
{registered, []},
{applications, [kernel,stdlib,ssl,pbkdf2,bcrypt,emqx]},
{applications, [kernel,stdlib,ssl,pbkdf2,emqx]},
{env, []},
{licenses, ["Apache-2.0"]},
{maintainers, ["EMQ X Team <contact@emqx.io>"]},

View File

@ -1,3 +1,3 @@
{deps,
[{prometheus, {git, "https://github.com/emqx/prometheus.erl", {tag, "v3.1.1"}}}
]}.
]}.

View File

@ -1 +1 @@
{deps, []}.
{deps, []}.

View File

@ -1,3 +1,3 @@
{deps, [
{recon, {git, "https://github.com/ferd/recon", {tag, "2.5.0"}}}
]}.
]}.

View File

@ -1 +1 @@
{deps, []}.
{deps, []}.

View File

@ -1 +1 @@
{deps, []}.
{deps, []}.

View File

@ -134,35 +134,37 @@ on_action_create_republish(Id, #{<<"target_topic">> := TargetTopic, <<"target_qo
(Selected, _Envs = #{qos := QoS, flags := Flags, timestamp := Timestamp}) ->
?LOG(debug, "[republish] republish to: ~p, Payload: ~p",
[TargetTopic, Selected]),
emqx_broker:safe_publish(
emqx_message:set_headers(
#{republish_by => Id},
#message{
id = emqx_guid:gen(),
qos = if TargetQoS =:= -1 -> QoS; true -> TargetQoS end,
from = Id,
flags = Flags,
topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected),
payload = emqx_rule_utils:proc_tmpl(PayloadTks, Selected),
timestamp = Timestamp
})
);
increase_and_publish(
#message{
id = emqx_guid:gen(),
qos = if TargetQoS =:= -1 -> QoS; true -> TargetQoS end,
from = Id,
flags = Flags,
headers = #{republish_by => Id},
topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected),
payload = emqx_rule_utils:proc_tmpl(PayloadTks, Selected),
timestamp = Timestamp
});
%% in case this is not a "message.publish" request
(Selected, _Envs) ->
?LOG(debug, "[republish] republish to: ~p, Payload: ~p",
[TargetTopic, Selected]),
emqx_broker:safe_publish(
#message{
id = emqx_guid:gen(),
qos = if TargetQoS =:= -1 -> 0; true -> TargetQoS end,
from = Id,
flags = #{dup => false, retain => false},
headers = #{republish_by => Id},
topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected),
payload = emqx_rule_utils:proc_tmpl(PayloadTks, Selected),
timestamp = erlang:system_time(millisecond)
})
increase_and_publish(
#message{
id = emqx_guid:gen(),
qos = if TargetQoS =:= -1 -> 0; true -> TargetQoS end,
from = Id,
flags = #{dup => false, retain => false},
headers = #{republish_by => Id},
topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected),
payload = emqx_rule_utils:proc_tmpl(PayloadTks, Selected),
timestamp = erlang:system_time(millisecond)
})
end.
increase_and_publish(Msg) ->
emqx_metrics:inc_msg(Msg),
emqx_broker:safe_publish(Msg).
on_action_do_nothing(_, _) ->
fun(_, _) -> ok end.

View File

@ -420,7 +420,7 @@ action_instance_id(ActionName) ->
iolist_to_binary([atom_to_list(ActionName), "_", integer_to_list(erlang:system_time())]).
cluster_call(Func, Args) ->
case rpc:multicall([node() | nodes()], ?MODULE, Func, Args, 5000) of
case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, Func, Args, 5000) of
{ResL, []} ->
case lists:filter(fun(ok) -> false; (_) -> true end, ResL) of
[] -> ok;

View File

@ -130,10 +130,18 @@
]).
%% Map Funcs
-export([ map_new/0
]).
-export([ map_get/2
, map_get/3
, map_put/3
, map_new/0
]).
%% For backword compatibility
-export([ mget/2
, mget/3
, mput/3
]).
%% Array Funcs
@ -225,7 +233,7 @@ payload() ->
payload(Path) ->
fun(#{payload := Payload}) when erlang:is_map(Payload) ->
emqx_rule_maps:nested_get(map_path(Path), Payload);
map_get(Path, Payload);
(_) -> undefined
end.
@ -599,6 +607,15 @@ map_get(Key, Map) ->
map_get(Key, Map, undefined).
map_get(Key, Map, Default) ->
emqx_rule_maps:nested_get(map_path(Key), Map, Default).
map_put(Key, Val, Map) ->
emqx_rule_maps:nested_put(map_path(Key), Val, Map).
mget(Key, Map) ->
mget(Key, Map, undefined).
mget(Key, Map, Default) ->
case maps:find(Key, Map) of
{ok, Val} -> Val;
error when is_atom(Key) ->
@ -622,7 +639,7 @@ map_get(Key, Map, Default) ->
Default
end.
map_put(Key, Val, Map) ->
mput(Key, Val, Map) ->
case maps:find(Key, Map) of
{ok, _} -> maps:put(Key, Val, Map);
error when is_atom(Key) ->

View File

@ -25,7 +25,7 @@
-type(params_spec() :: #{atom() => term()}).
-type(params() :: #{binary() => term()}).
-define(DATA_TYPES, [string, number, float, boolean, object, array]).
-define(DATA_TYPES, [string, number, float, boolean, object, array, file]).
%%------------------------------------------------------------------------------
%% APIs
@ -68,6 +68,8 @@ do_validate_param(Val, Spec = #{type := Type}) ->
end,
validate_type(Val, Type, Spec).
validate_type(Val, file, _Spec) ->
ok = validate_file(Val);
validate_type(Val, string, Spec) ->
ok = validate_string(Val, reg_exp(maps:get(format, Spec, any)));
validate_type(Val, number, Spec) ->
@ -110,6 +112,9 @@ validate_boolean(true) -> ok;
validate_boolean(false) -> ok;
validate_boolean(Val) -> error({invalid_data_type, {boolean, Val}}).
validate_file(Val) when is_binary(Val) -> ok;
validate_file(Val) -> error({invalid_data_type, {file, Val}}).
reg_exp(url) -> "^https?://\\w+(\.\\w+)*(:[0-9]+)?";
reg_exp(topic) -> "^/?(\\w|\\#|\\+)+(/?(\\w|\\#|\\+))*/?$";
reg_exp(resource_type) -> "[a-zA-Z0-9_:-]";

View File

@ -51,7 +51,7 @@ groups() ->
]},
{actions, [],
[t_inspect_action
%,t_republish_action
,t_republish_action
]},
{api, [],
[t_crud_rule_api,
@ -325,12 +325,14 @@ t_inspect_action(_Config) ->
ok.
t_republish_action(_Config) ->
Qos0Received = emqx_metrics:val('messages.qos0.received'),
Received = emqx_metrics:val('messages.received'),
ok = emqx_rule_engine:load_providers(),
{ok, #rule{id = Id, for = [<<"t1">>]}} =
emqx_rule_engine:create_rule(
#{rawsql => <<"select topic, payload, qos from \"t1\"">>,
actions => [#{name => 'republish',
args => #{<<"target_topic">> => <<"t1">>,
args => #{<<"target_topic">> => <<"t2">>,
<<"target_qos">> => -1,
<<"payload_tmpl">> => <<"${payload}">>}}],
description => <<"builtin-republish-rule">>}),
@ -347,6 +349,8 @@ t_republish_action(_Config) ->
end,
emqtt:stop(Client),
emqx_rule_registry:remove_rule(Id),
?assertEqual(2, emqx_metrics:val('messages.qos0.received') - Qos0Received ),
?assertEqual(2, emqx_metrics:val('messages.received') - Received),
ok.
%%------------------------------------------------------------------------------

View File

@ -489,12 +489,27 @@ t_contains(_) ->
t_map_get(_) ->
?assertEqual(1, apply_func(map_get, [<<"a">>, #{a => 1}])),
?assertEqual(undefined, apply_func(map_get, [<<"a">>, #{}])).
?assertEqual(undefined, apply_func(map_get, [<<"a">>, #{}])),
?assertEqual(1, apply_func(map_get, [<<"a.b">>, #{a => #{b => 1}}])),
?assertEqual(undefined, apply_func(map_get, [<<"a.c">>, #{a => #{b => 1}}])).
t_map_put(_) ->
?assertEqual(#{<<"a">> => 1}, apply_func(map_put, [<<"a">>, 1, #{}])),
?assertEqual(#{a => 2}, apply_func(map_put, [<<"a">>, 2, #{a => 1}])),
?assertEqual(#{<<"a">> => #{<<"b">> => 1}}, apply_func(map_put, [<<"a.b">>, 1, #{}])),
?assertEqual(#{a => #{b => 1, <<"c">> => 1}}, apply_func(map_put, [<<"a.c">>, 1, #{a => #{b => 1}}])).
t_mget(_) ->
?assertEqual(1, apply_func(map_get, [<<"a">>, #{a => 1}])),
?assertEqual(1, apply_func(map_get, [<<"a">>, #{<<"a">> => 1}])),
?assertEqual(undefined, apply_func(map_get, [<<"a">>, #{}])).
t_mput(_) ->
?assertEqual(#{<<"a">> => 1}, apply_func(map_put, [<<"a">>, 1, #{}])),
?assertEqual(#{<<"a">> => 2}, apply_func(map_put, [<<"a">>, 2, #{<<"a">> => 1}])),
?assertEqual(#{a => 2}, apply_func(map_put, [<<"a">>, 2, #{a => 1}])).
%%------------------------------------------------------------------------------
%% Test cases for Hash funcs
%%------------------------------------------------------------------------------

View File

@ -1 +1 @@
{deps, []}.
{deps, []}.

View File

@ -92,8 +92,8 @@ get(_Bindings, #{<<"mechanism">> := Mechanism0,
case Mechanism of
<<"SCRAM-SHA-1">> ->
case emqx_sasl_scram:lookup(Username) of
{ok, AuthInfo} ->
return({ok, AuthInfo});
{ok, AuthInfo = #{salt := Salt}} ->
return({ok, AuthInfo#{salt => base64:decode(Salt)}});
{error, Reason} ->
return({error, Reason})
end;

View File

@ -70,7 +70,7 @@ cli(["scram", "lookup", Username0]) ->
salt := Salt,
iteration_count := IterationCount}} ->
emqx_ctl:print("Username: ~s, Stored Key: ~s, Server Key: ~s, Salt: ~s, Iteration Count: ~p~n",
[Username, StoredKey, ServerKey, Salt, IterationCount]);
[Username, StoredKey, ServerKey, base64:decode(Salt), IterationCount]);
{error, not_found} ->
emqx_ctl:print("Authentication information not found~n")
end;

View File

@ -77,3 +77,64 @@ t_scram(_) ->
{ok, {ok, ServerFinal, #{}}} = emqx_sasl:check(AuthMethod, ClientFinal, Cache),
{ok, _} = emqx_sasl:check(AuthMethod, ServerFinal, ClientCache).
t_proto(_) ->
process_flag(trap_exit, true),
Username = <<"username">>,
Password = <<"password">>,
Salt = <<"emqx">>,
AuthMethod = <<"SCRAM-SHA-1">>,
{ok, Client0} = emqtt:start_link([{clean_start, true},
{proto_ver, v5},
{enhanced_auth, #{method => AuthMethod,
params => #{username => Username,
password => Password,
salt => Salt}}},
{connect_timeout, 6000}]),
{error,{not_authorized,#{}}} = emqtt:connect(Client0),
ok = emqx_sasl_scram:add(Username, Password, Salt),
{ok, Client1} = emqtt:start_link([{clean_start, true},
{proto_ver, v5},
{enhanced_auth, #{method => AuthMethod,
params => #{username => Username,
password => Password,
salt => Salt}}},
{connect_timeout, 6000}]),
{ok, _} = emqtt:connect(Client1),
timer:sleep(200),
ok = emqtt:reauthentication(Client1, #{params => #{username => Username,
password => Password,
salt => Salt}}),
timer:sleep(200),
ErrorFun = fun (_State) -> {ok, <<>>, #{}} end,
ok = emqtt:reauthentication(Client1, #{params => #{},function => ErrorFun}),
receive
{disconnected,ReasonCode2,#{}} ->
?assertEqual(ReasonCode2, 135)
after 500 ->
error("emqx re-authentication failed")
end,
{ok, Client2} = emqtt:start_link([{clean_start, true},
{proto_ver, v5},
{enhanced_auth, #{method => AuthMethod,
params => #{},
function =>fun (_State) -> {ok, <<>>, #{}} end}},
{connect_timeout, 6000}]),
{error,{not_authorized,#{}}} = emqtt:connect(Client2),
receive_msg(),
process_flag(trap_exit, false).
receive_msg() ->
receive
{'EXIT', Msg} ->
ct:print("==========+~p~n", [Msg]),
receive_msg()
after 200 -> ok
end.

View File

@ -1 +1 @@
{deps, []}.
{deps, []}.

View File

@ -201,8 +201,7 @@ idle(cast, {incoming, ?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1,
emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId);
true -> <<TopicId:16>>
end,
Msg = emqx_message:make({?NEG_QOS_CLIENT_ID, State#state.username},
?QOS_0, TopicName, Data),
Msg = emqx_message:make(?NEG_QOS_CLIENT_ID, ?QOS_0, TopicName, Data),
(TopicName =/= undefined) andalso emqx_broker:publish(Msg),
?LOG(debug, "Client id=~p receives a publish with QoS=-1 in idle mode!", [ClientId], State),
{keep_state_and_data, State#state.idle_timeout};
@ -942,14 +941,13 @@ do_publish_will(#state{will_msg = #will_msg{payload = undefined}}) ->
ok;
do_publish_will(#state{will_msg = #will_msg{topic = undefined}}) ->
ok;
do_publish_will(#state{channel = Channel, will_msg = WillMsg}) ->
do_publish_will(#state{will_msg = WillMsg, clientid = ClientId}) ->
#will_msg{qos = QoS, retain = Retain, topic = Topic, payload = Payload} = WillMsg,
Publish = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, dup = false,
qos = QoS, retain = Retain},
variable = #mqtt_packet_publish{topic_name = Topic, packet_id = 1000},
payload = Payload},
ClientInfo = emqx_channel:info(clientinfo, Channel),
emqx_broker:publish(emqx_packet:to_message(ClientInfo, Publish)),
emqx_broker:publish(emqx_packet:to_message(Publish, ClientId)),
ok.
do_puback(TopicId, MsgId, ReturnCode, _StateName,

View File

@ -1 +1 @@
{deps, []}.
{deps, []}.

View File

@ -1 +1 @@
{deps, []}.
{deps, []}.

View File

@ -1 +1 @@
{deps, []}.
{deps, []}.

View File

@ -22,27 +22,40 @@
-define(RESOURCE_TYPE_WEBHOOK, 'web_hook').
-define(RESOURCE_CONFIG_SPEC, #{
url => #{type => string,
url => #{order => 1,
type => string,
format => url,
required => true,
title => #{en => <<"Request URL">>,
zh => <<"请求 URL"/utf8>>},
description => #{en => <<"Request URL">>,
zh => <<"请求 URL"/utf8>>}},
headers => #{type => object,
schema => #{},
default => #{},
title => #{en => <<"Request Header">>,
zh => <<"请求头"/utf8>>},
description => #{en => <<"Request Header">>,
zh => <<"请求头"/utf8>>}},
method => #{type => string,
method => #{order => 2,
type => string,
enum => [<<"PUT">>,<<"POST">>,<<"GET">>,<<"DELETE">>],
default => <<"POST">>,
title => #{en => <<"Request Method">>,
zh => <<"请求方法"/utf8>>},
description => #{en => <<"Request Method. Note that the payload_template will be discarded in case of GET method">>,
zh => <<"请求方法。注意:当请求方法为 GET 的时候payload_template 参数会被忽略"/utf8>>}}
description => #{en => <<"Request Method. \n"
"Note that: the Payload Template of Action will be discarded in case of GET method">>,
zh => <<"请求方法。\n"
"注意:当方法为 GET 时,动作中的 '消息内容模板' 参数会被忽略"/utf8>>}},
content_type => #{order => 3,
type => string,
enum => [<<"application/json">>,<<"text/plain;charset=UTF-8">>],
default => <<"application/json">>,
title => #{en => <<"Content-Type">>,
zh => <<"Content-Type"/utf8>>},
description => #{en => <<"The Content-Type of HTTP Request">>,
zh => <<"HTTP 请求头中的 Content-Type 字段值"/utf8>>}},
headers => #{order => 4,
type => object,
schema => #{},
default => #{},
title => #{en => <<"Request Header">>,
zh => <<"请求头"/utf8>>},
description => #{en => <<"The custom HTTP request headers">>,
zh => <<"自定义的 HTTP 请求头列表"/utf8>>}}
}).
-define(ACTION_PARAM_RESOURCE, #{
@ -57,18 +70,32 @@
-define(ACTION_DATA_SPEC, #{
'$resource' => ?ACTION_PARAM_RESOURCE,
path => #{order => 1,
type => string,
required => false,
default => <<>>,
title => #{en => <<"Path">>,
zh => <<"Path"/utf8>>},
description => #{en => <<"A path component, variable interpolation from "
"SQL statement is supported. This value will be "
"concatenated with Request URL.">>,
zh => <<"URL 的路径配置,支持使用 ${} 获取规则输出的字段值。\n"
"例如:${clientid}。该值会与 Request URL 组成一个完整的 URL"/utf8>>}
},
payload_tmpl => #{
order => 1,
order => 2,
type => string,
input => textarea,
required => false,
default => <<"">>,
title => #{en => <<"Payload Template">>,
zh => <<"消息内容模板"/utf8>>},
description => #{en => <<"The payload template, variable interpolation is supported. If using empty template (default), then the payload will be all the available vars in JOSN format">>,
zh => <<"消息内容模板,支持变量。若使用空模板(默认),消息内容为 JSON 格式的所有字段"/utf8>>}
}
}).
description => #{en => <<"The payload template, variable interpolation is supported."
"If using empty template (default), then the payload will "
"be all the available vars in JSON format">>,
zh => <<"消息内容模板,支持使用 ${} 获取变量值。"
"默认消息内容为规则输出的所有字段的 JSON 字符串"/utf8>>}}
}).
-resource_type(#{name => ?RESOURCE_TYPE_WEBHOOK,
create => on_resource_create,
@ -139,11 +166,13 @@ on_resource_destroy(_ResId, _Params) ->
%% An action that forwards publish messages to a remote web server.
-spec(on_action_create_data_to_webserver(Id::binary(), #{url() := string()}) -> action_fun()).
on_action_create_data_to_webserver(_Id, Params) ->
#{url := Url, headers := Headers, method := Method, payload_tmpl := PayloadTmpl}
#{url := Url, headers := Headers, method := Method, content_type := ContentType, payload_tmpl := PayloadTmpl, path := Path}
= parse_action_params(Params),
PayloadTks = emqx_rule_utils:preproc_tmpl(PayloadTmpl),
PathTks = emqx_rule_utils:preproc_tmpl(Path),
fun(Selected, _Envs) ->
http_request(Url, Headers, Method, format_msg(PayloadTks, Selected))
FullUrl = Url ++ emqx_rule_utils:proc_tmpl(PathTks, Selected),
http_request(FullUrl, Headers, Method, ContentType, format_msg(PayloadTks, Selected))
end.
format_msg([], Data) ->
@ -155,15 +184,15 @@ format_msg(Tokens, Data) ->
%% Internal functions
%%------------------------------------------------------------------------------
create_req(get, Url, Headers, _) ->
create_req(get, Url, Headers, _, _) ->
{(Url), (Headers)};
create_req(_, Url, Headers, Body) ->
{(Url), (Headers), "application/json", (Body)}.
create_req(_, Url, Headers, ContentType, Body) ->
{(Url), (Headers), binary_to_list(ContentType), (Body)}.
http_request(Url, Headers, Method, Params) ->
logger:debug("[WebHook Action] ~s to ~s, headers: ~p, body: ~p", [Method, Url, Headers, Params]),
case do_http_request(Method, create_req(Method, Url, Headers, Params),
http_request(Url, Headers, Method, ContentType, Params) ->
logger:debug("[WebHook Action] ~s to ~s, headers: ~p, content-type: ~p, body: ~p", [Method, Url, Headers, ContentType, Params]),
case do_http_request(Method, create_req(Method, Url, Headers, ContentType, Params),
[{timeout, 5000}], [], 0) of
{ok, _} -> ok;
{error, Reason} ->
@ -185,7 +214,9 @@ parse_action_params(Params = #{<<"url">> := Url}) ->
#{url => str(Url),
headers => headers(maps:get(<<"headers">>, Params, undefined)),
method => method(maps:get(<<"method">>, Params, <<"POST">>)),
payload_tmpl => maps:get(<<"payload_tmpl">>, Params, <<>>)}
content_type => maps:get(<<"content_type">>, Params, <<"application/json">>),
payload_tmpl => maps:get(<<"payload_tmpl">>, Params, <<>>),
path => maps:get(<<"path">>, Params, <<>>)}
catch _:_ ->
throw({invalid_params, Params})
end.

View File

@ -76,7 +76,7 @@
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.0"}}}
, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.1"}}}
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "v0.4.2"}}}
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.0"}}}
, {replayq, {git, "https://github.com/emqx/replayq", {tag, "v0.2.0"}}}
, {erlport, {git, "https://github.com/emqx/erlport", {tag, "v1.2.2"}}}
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.3"}}}