Merge pull request #4550 from z8674558/merge-master-to-v5

Merge master to v5
This commit is contained in:
Zaiming (Stone) Shi 2021-04-12 20:38:25 +02:00 committed by GitHub
commit 6732e77135
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
75 changed files with 1947 additions and 511 deletions

View File

@ -1,7 +1,7 @@
version: '3.9'
services:
redis_cluster:
redis_server:
image: redis:${REDIS_TAG}
container_name: redis
volumes:

View File

@ -1,7 +1,7 @@
version: '3.9'
services:
redis_cluster:
redis_server:
container_name: redis
image: redis:${REDIS_TAG}
volumes:

View File

@ -1,7 +1,7 @@
version: '3.9'
services:
redis_cluster:
redis_server:
container_name: redis
image: redis:${REDIS_TAG}
volumes:

View File

@ -0,0 +1,12 @@
version: '3.9'
services:
redis_server:
container_name: redis
image: redis:${REDIS_TAG}
volumes:
- ../../apps/emqx_auth_redis/test/emqx_auth_redis_SUITE_data/certs:/tls
- ./redis/:/data/conf
command: bash -c "/bin/bash /data/conf/redis.sh --node sentinel --tls-enabled && tail -f /var/log/redis-server.log"
networks:
- emqx_bridge

View File

@ -4,4 +4,7 @@ logfile /var/log/redis-server.log
tls-cert-file /tls/redis.crt
tls-key-file /tls/redis.key
tls-ca-cert-file /tls/ca.crt
tls-replication yes
protected-mode no
requirepass public
masterauth public

View File

@ -2,3 +2,4 @@ daemonize yes
bind 0.0.0.0 ::
logfile /var/log/redis-server.log
requirepass public
masterauth public

View File

@ -49,31 +49,41 @@ if [ "${node}" = "cluster" ] ; then
redis-server /data/conf/redis.conf --port 7002 --cluster-config-file /data/conf/nodes.7002.conf --cluster-enabled yes;
fi
elif [ "${node}" = "sentinel" ] ; then
if $tls ; then
redis-server /data/conf/redis-tls.conf --port 7000 --cluster-config-file /data/conf/nodes.7000.conf \
--tls-port 8000 --cluster-enabled no;
redis-server /data/conf/redis-tls.conf --port 7001 --cluster-config-file /data/conf/nodes.7001.conf \
--tls-port 8001 --cluster-enabled no --slaveof "$LOCAL_IP" 8000;
redis-server /data/conf/redis-tls.conf --port 7002 --cluster-config-file /data/conf/nodes.7002.conf \
--tls-port 8002 --cluster-enabled no --slaveof "$LOCAL_IP" 8000;
else
redis-server /data/conf/redis.conf --port 7000 --cluster-config-file /data/conf/nodes.7000.conf \
--cluster-enabled no;
redis-server /data/conf/redis.conf --port 7001 --cluster-config-file /data/conf/nodes.7001.conf \
--cluster-enabled no --slaveof "$LOCAL_IP" 7000;
redis-server /data/conf/redis.conf --port 7002 --cluster-config-file /data/conf/nodes.7002.conf \
--cluster-enabled no --slaveof "$LOCAL_IP" 7000;
fi
fi
REDIS_LOAD_FLG=true;
while $REDIS_LOAD_FLG;
do
sleep 1;
redis-cli -p 7000 info 1> /data/conf/r7000i.log 2> /dev/null;
redis-cli --pass public --no-auth-warning -p 7000 info 1> /data/conf/r7000i.log 2> /dev/null;
if [ -s /data/conf/r7000i.log ]; then
:
else
continue;
fi
redis-cli -p 7001 info 1> /data/conf/r7001i.log 2> /dev/null;
redis-cli --pass public --no-auth-warning -p 7001 info 1> /data/conf/r7001i.log 2> /dev/null;
if [ -s /data/conf/r7001i.log ]; then
:
else
continue;
fi
redis-cli -p 7002 info 1> /data/conf/r7002i.log 2> /dev/null;
redis-cli --pass public --no-auth-warning -p 7002 info 1> /data/conf/r7002i.log 2> /dev/null;
if [ -s /data/conf/r7002i.log ]; then
:
else
@ -82,7 +92,27 @@ do
if [ "${node}" = "cluster" ] ; then
yes "yes" | redis-cli --cluster create "$LOCAL_IP:7000" "$LOCAL_IP:7001" "$LOCAL_IP:7002" --pass public --no-auth-warning;
elif [ "${node}" = "sentinel" ] ; then
cp /data/conf/sentinel.conf /_sentinel.conf
tee /_sentinel.conf>/dev/null << EOF
port 26379
bind 0.0.0.0 ::
daemonize yes
logfile /var/log/redis-server.log
dir /tmp
EOF
if $tls ; then
cat >>/_sentinel.conf<<EOF
tls-port 26380
tls-replication yes
tls-cert-file /tls/redis.crt
tls-key-file /tls/redis.key
tls-ca-cert-file /tls/ca.crt
sentinel monitor mymaster $LOCAL_IP 8000 1
EOF
else
cat >>/_sentinel.conf<<EOF
sentinel monitor mymaster $LOCAL_IP 7000 1
EOF
fi
redis-server /_sentinel.conf --sentinel;
fi
REDIS_LOAD_FLG=false;

View File

@ -1,4 +0,0 @@
port 26379
dir /tmp
sentinel monitor mymaster 172.16.239.10 7000 1
logfile /var/log/redis-server.log

View File

@ -4,6 +4,7 @@ on:
push:
tags:
- v*
- e*
release:
types:
- published
@ -300,6 +301,7 @@ jobs:
- tcp
node_type:
- single
- sentinel
- cluster
exclude:
- redis_tag: 5
@ -354,6 +356,15 @@ jobs:
EMQX_AUTH__REDIS__TYPE=single
EMQX_AUTH__REDIS__SERVER=${redis_${{ matrix.network_type }}_address}:6380
EOF
- name: setup
if: matrix.node_type == 'sentinel'
run: |
cat <<-EOF >> "$GITHUB_ENV"
EMQX_AUTH__REDIS__TYPE=sentinel
EMQX_AUTH__REDIS__SERVER=${redis_${{ matrix.network_type }}_address}:26379
EMQX_AUTH__REDIS__SENTINEL=mymaster
EMQX_AUTH__REDIS__POOL=1
EOF
- name: setup
if: matrix.node_type == 'cluster' && matrix.connect_type == 'tcp'
run: |

View File

@ -4,6 +4,7 @@ on:
push:
tags:
- v*
- e*
release:
types:
- published

View File

@ -4,6 +4,7 @@ on:
push:
tags:
- v*
- e*
release:
types:
- published
@ -109,8 +110,9 @@ jobs:
docker exec -i erlang bash -c "make ct"
- name: run cover
run: |
printenv > .env
docker exec -i erlang bash -c "make cover"
docker exec -i erlang bash -c "make coveralls"
docker exec --env-file .env -i erlang bash -c "make coveralls"
- uses: actions/upload-artifact@v1
if: failure()
with:
@ -120,3 +122,15 @@ jobs:
with:
name: cover
path: _build/test/cover
finish:
needs: run_common_test
runs-on: ubuntu-20.04
steps:
- name: Coveralls Finished
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
curl -v -k https://coveralls.io/webhook \
--header "Content-Type: application/json" \
--data "{\"repo_name\":\"$GITHUB_REPOSITORY\",\"repo_token\":\"$GITHUB_TOKEN\",\"payload\":{\"build_num\":$GITHUB_RUN_ID,\"status\":\"done\"}}"

View File

@ -1,11 +1,11 @@
$(shell $(CURDIR)/scripts/git-hooks-init.sh)
REBAR_VERSION = 3.14.3-emqx-5
REBAR_VERSION = 3.14.3-emqx-6
REBAR = $(CURDIR)/rebar3
BUILD = $(CURDIR)/build
SCRIPTS = $(CURDIR)/scripts
export PKG_VSN ?= $(shell $(CURDIR)/pkg-vsn.sh)
export EMQX_DESC ?= EMQ X
export EMQX_CE_DASHBOARD_VERSION ?= v4.3.0-beta.1
export EMQX_CE_DASHBOARD_VERSION ?= v4.3.0-rc.1
ifeq ($(OS),Windows_NT)
export REBAR_COLOR=none
endif

View File

@ -23,6 +23,8 @@
*EMQ X* 是跨平台的,支持 Linux、Unix、macOS 以及 Windows。这意味着 *EMQ X* 可以部署在 x86_64 架构的服务器上,也可以部署在 Raspberry Pi 这样的 ARM 设备上。
Windows 上编译和运行 *EMQ X* 的详情参考:[Windows.md](./Windows.md)
#### EMQ X Docker 镜像安装
```
@ -40,16 +42,29 @@ docker run -d --name emqx -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8883:8883 -p
3.0 版本开始,构建 *EMQ X* 需要 Erlang/OTP R21+。
4.3 及以后的版本:
```bash
git clone https://github.com/emqx/emqx.git
cd emqx
make
_build/emqx/rel/emqx/bin console
```
对于 4.3 之前的版本,通过另外一个仓库构建:
```bash
git clone https://github.com/emqx/emqx-rel.git
cd emqx-rel && make
cd _rel/emqx && ./bin/emqx console
cd emqx-rel
make
_build/emqx/rel/emqx/bin/emqx console
```
## 快速入门
如果 emqx 从源码编译,`cd _build/emqx/rel/emqx`。
如果 emqx 通过 zip 包安装,则切换到 emqx 的根目录。
```
# Start emqx
./bin/emqx start
@ -63,6 +78,20 @@ cd _rel/emqx && ./bin/emqx console
*EMQ X* 启动,可以使用浏览器访问 http://localhost:18083 来查看 Dashboard。
## 测试
### 执行所有测试
```
make eunit ct
```
### 执行部分应用的 common tests
```bash
make apps/emqx_bridge_mqtt-ct
```
### 静态分析(Dialyzer)
##### 分析所有应用程序
```
@ -74,15 +103,27 @@ make dialyzer
DIALYZER_ANALYSE_APP=emqx_lwm2m,emqx_auth_jwt,emqx_auth_ldap make dialyzer
```
## FAQ
## 社区
### FAQ
访问 [EMQ X FAQ](https://docs.emqx.cn/broker/latest/faq/faq.html) 以获取常见问题的帮助。
## 产品路线
### 问答
通过 [EMQ X Roadmap uses Github milestones](https://github.com/emqx/emqx/milestones) 参与跟踪项目进度。
[GitHub Discussions](https://github.com/emqx/emqx/discussions)
[EMQ 中文问答社区](https://askemq.com)
## 社区、讨论、贡献和支持
### 参与设计
如果对 EMQ X 有改进建议,可以向[EIP](https://github.com/emqx/eip) 提交 PR 和 ISSUE
### 插件开发
如果想集成或开发你自己的插件,参考 [lib-extra/README.md](./lib-extra/README.md)
### 联系我们
你可通过以下途径与 EMQ 社区及开发者联系:
@ -90,7 +131,6 @@ DIALYZER_ANALYSE_APP=emqx_lwm2m,emqx_auth_jwt,emqx_auth_ldap make dialyzer
- [Twitter](https://twitter.com/emqtt)
- [Facebook](https://www.facebook.com/emqxmqtt)
- [Reddit](https://www.reddit.com/r/emqx/)
- [Forum](https://askemq.com)
- [Weibo](https://weibo.com/emqtt)
- [Blog](https://www.emqx.cn/blog)

View File

@ -11,18 +11,22 @@
[English](./README.md) | [简体中文](./README-CN.md) | 日本語
*EMQ X* ブローカーは、数千万のクライアントを同時に処理できるIoT、M2M、モバイルアプリケーション向けの、完全なオープンソース、高拡張性、高可用性、分散型MQTTメッセージングブローカーです。
*EMQ X* は、高い拡張性と可用性をもつ、分散型のMQTTブローカーです。数千万のクライアントを同時に処理するIoT、M2M、モバイルアプリケーション向けです。
バージョン3.0以降、*EMQ X* ブローカーはMQTTV5.0プロトコル仕様を完全にサポートし、MQTT V3.1およびV3.1.1と下位互換性があります。MQTT-SN、CoAP、LwM2M、WebSocket、STOMPなどの通信プロトコルをサポートしています。 1つのクラスター上で1,000万以上の同時MQTT接続に拡張することができます。
version 3.0 以降、*EMQ X* は MQTT V5.0 の仕様を完全にサポートしており、MQTT V3.1およびV3.1.1とも下位互換性があります。
MQTT-SN、CoAP、LwM2M、WebSocket、STOMPなどの通信プロトコルをサポートしています。 MQTTの同時接続数は1つのクラスター上で1,000万以上にまでスケールできます。
- 新機能の一覧については、[EMQ Xリリースート](https://github.com/emqx/emqx/releases)を参照してください。
- 詳細はこちら[EMQ X公式ウェブサイト](https://www.emqx.io/)をご覧ください。
## インストール
*EMQ X* ブローカーはクロスプラットフォームで、Linux、Unix、macOS、Windowsをサポートしています。これは、*EMQ X* ブローカーをx86_64アーキテクチャサーバー、またはRaspberryPiなどのARMデバイスにデプロイできることを意味します。
*EMQ X* はクロスプラットフォームで、Linux、Unix、macOS、Windowsをサポートしています。
そのため、x86_64アーキテクチャサーバー、またはRaspberryPiなどのARMデバイスに *EMQ X* をデプロイすることもできます。
#### EMQ X Dockerイメージによるインストール
Windows上における *EMQ X* のビルドと実行については、[Windows.md](./Windows.md)をご参照ください。
#### Docker イメージによる EMQ X のインストール
```
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx
@ -30,25 +34,31 @@ docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p
#### バイナリパッケージによるインストール
対応するオペレーティングシステムのバイナリソフトウェアパッケージは、[EMQ Xのダウンロード](https://www.emqx.io/downloads)ページから取得できます。
それぞれのOSに対応したバイナリソフトウェアパッケージは、[EMQ Xのダウンロード](https://www.emqx.io/downloads)ページから取得できます。
- [シングルノードインストール](https://docs.emqx.io/broker/latest/en/getting-started/installation.html)
- [マルチノードインストール](https://docs.emqx.io/broker/latest/en/advanced/cluster.html)
## ソースからビルド
バージョン3.0以降、*EMQ X* ブローカーをビルドするには Erlang/OTP R21+ が必要です。
version 3.0 以降の *EMQ X* をビルドするには Erlang/OTP R21+ が必要です。
```
version 4.3 以降の場合:
```bash
git clone https://github.com/emqx/emqx-rel.git
cd emqx-rel && make
cd _rel/emqx && ./bin/emqx console
cd emqx-rel
make
_build/emqx/rel/emqx/bin/emqx console
```
## クイックスタート
emqx をソースコードからビルドした場合は、
`cd _buid/emqx/rel/emqx`でリリースビルドのディレクトリに移動してください。
リリースパッケージからインストールした場合は、インストール先のルートディレクトリに移動してください。
```
# Start emqx
./bin/emqx start
@ -60,7 +70,21 @@ cd _rel/emqx && ./bin/emqx console
./bin/emqx stop
```
*EMQ X* ブローカーを起動したら、ブラウザで http://localhost:18083 にアクセスしてダッシュボードを表示できます。
*EMQ X* の起動後、ブラウザで http://localhost:18083 にアクセスするとダッシュボードが表示されます。
## テスト
### 全てのテストケースを実行する
```
make eunit ct
```
### common test の一部を実行する
```bash
make apps/emqx_bridge_mqtt-ct
```
### Dialyzer
##### アプリケーションの型情報を解析する
@ -73,30 +97,28 @@ make dialyzer
DIALYZER_ANALYSE_APP=emqx_lwm2m,emqx_auth_jwt,emqx_auth_ldap make dialyzer
```
## FAQ
## コミュニティ
よくある質問については、[EMQ X FAQ](https://docs.emqx.io/broker/latest/en/faq/faq.html)にアクセスしてください。
### FAQ
## ロードマップ
よくある質問については、[EMQ X FAQ](https://docs.emqx.io/broker/latest/en/faq/faq.html)をご確認ください。
[EMQ X Roadmap uses Github milestones](https://github.com/emqx/emqx/milestones)からプロジェクトの進捗状況を追跡できます。
### 質問する
## コミュニティ、ディスカッション、貢献、サポート
質問や知識共有の場として[GitHub Discussions](https://github.com/emqx/emqx/discussions)を用意しています。
次のチャネルを通じて、EMQコミュニティおよび開発者に連絡できます。
### 提案
- [Slack](https://slack-invite.emqx.io/)
- [Twitter](https://twitter.com/emqtt)
- [Facebook](https://www.facebook.com/emqxmqtt)
- [Reddit](https://www.reddit.com/r/emqx/)
- [Forum](https://groups.google.com/d/forum/emqtt)
- [Blog](https://medium.com/@emqtt)
大規模な改善のご提案がある場合は、[EIP](https://github.com/emqx/eip)にPRをどうぞ。
バグ、問題、機能のリクエストは、[emqx/emqx](https://github.com/emqx/emqx/issues)に送信してください。
### 自作プラグイン
## MQTT仕様
プラグインを自作することができます。[lib-extra/README.md](./lib-extra/README.md)をご確認ください。
次のリンクから、MQTTプロトコルについて学習および確認できます。
## MQTTの仕様について
下記のサイトで、MQTTのプロトコルについて学習・確認できます。
[MQTT Version 3.1.1](https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html)

View File

@ -1,5 +1,13 @@
{deps,
[{mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.7"}}}
%% NOTE: mind poolboy version when updating mongodb-erlang version
[{mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.7"}}},
%% mongodb-erlang uses a special fork https://github.com/comtihon/poolboy.git
%% (which has overflow_ttl feature added).
%% However, it references `{branch, "master}` (commit 9c06a9a on 2021-04-07).
%% By accident, We have always been using the upstream fork due to
%% eredis_cluster's dependency getting resolved earlier.
%% Here we pin 1.5.2 to avoid surprises in the future.
{poolboy, {git, "https://github.com/emqx/poolboy.git", {tag, "1.5.2"}}}
]}.
{edoc_opts, [{preprocess, true}]}.

View File

@ -105,7 +105,7 @@
Key = cuttlefish:conf_get("auth.mysql.ssl.keyfile", Conf, undefined),
Verify = case cuttlefish:conf_get("auth.mysql.ssl.verify", Conf, false) of
true -> verify_peer;
flase -> verify_none
false -> verify_none
end,
SNI = case cuttlefish:conf_get("auth.mysql.ssl.server_name_indication", Conf, undefined) of
"disable" -> disable;

View File

@ -17,6 +17,7 @@
-module(emqx_auth_pgsql_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-define(POOL, emqx_auth_pgsql).

View File

@ -94,7 +94,7 @@
),
Verify = case cuttlefish:conf_get("auth.redis.ssl.verify", Conf, false) of
true -> verify_peer;
flase -> verify_none
false -> verify_none
end,
SNI = case cuttlefish:conf_get("auth.redis.ssl.server_name_indication", Conf, undefined) of
"disable" -> disable;

View File

@ -1,5 +1,8 @@
{deps,
[{eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.6.4"}}}
%% NOTE: mind poolboy version when updating eredis_cluster version
%% poolboy version may clash with emqx_auth_mongo
[{eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.6.4"}}},
{poolboy, {git, "https://github.com/emqx/poolboy.git", {tag, "1.5.2"}}}
]}.
{erl_opts, [warn_unused_vars,

View File

@ -17,6 +17,7 @@
-module(emqx_auth_redis_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqx/include/emqx.hrl").

View File

@ -124,24 +124,31 @@ safe_stop(Pid, StopF, Timeout) ->
end.
send(Conn, Msgs) ->
send(Conn, Msgs, undefined).
send(_Conn, [], PktId) ->
{ok, PktId};
send(#{client_pid := ClientPid} = Conn, [Msg | Rest], _PktId) ->
send(Conn, Msgs, []).
send(_Conn, [], []) ->
%% all messages in the batch are QoS-0
Ref = make_ref(),
%% QoS-0 messages do not have packet ID
%% the batch ack is simulated with a loop-back message
self() ! {batch_ack, Ref},
{ok, Ref};
send(_Conn, [], PktIds) ->
%% PktIds is not an empty list if there is any non-QoS-0 message in the batch,
%% And the worker should wait for all acks
{ok, PktIds};
send(#{client_pid := ClientPid} = Conn, [Msg | Rest], PktIds) ->
case emqtt:publish(ClientPid, Msg) of
ok ->
Ref = make_ref(),
self() ! {batch_ack, Ref},
send(Conn, Rest, Ref);
send(Conn, Rest, PktIds);
{ok, PktId} ->
send(Conn, Rest, PktId);
send(Conn, Rest, [PktId | PktIds]);
{error, Reason} ->
%% NOTE: There is no partial sucess of a batch and recover from the middle
%% only to retry all messages in one batch
{error, Reason}
end.
handle_puback(#{packet_id := PktId, reason_code := RC}, Parent)
when RC =:= ?RC_SUCCESS;
RC =:= ?RC_NO_MATCHING_SUBSCRIBERS ->

View File

@ -62,6 +62,8 @@
-module(emqx_bridge_worker).
-behaviour(gen_statem).
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
%% APIs
-export([ start_link/1
, start_link/2
@ -320,7 +322,7 @@ idle(state_timeout, reconnect, State) ->
connecting(State);
idle(info, {batch_ack, Ref}, State) ->
{ok, NewState} = do_ack(State, Ref),
NewState = handle_batch_ack(State, Ref),
{keep_state, NewState};
idle(Type, Content, State) ->
@ -355,7 +357,7 @@ connected(info, {disconnected, Conn, Reason},
keep_state_and_data
end;
connected(info, {batch_ack, Ref}, State) ->
{ok, NewState} = do_ack(State, Ref),
NewState = handle_batch_ack(State, Ref),
{keep_state, NewState, {next_event, internal, maybe_send}};
connected(Type, Content, State) ->
common(connected, Type, Content, State).
@ -479,11 +481,16 @@ retry_inflight(State, [#{q_ack_ref := QAckRef, batch := Batch} | Inflight]) ->
{error, State1} -> {error, State1}
end.
pop_and_send(#{inflight := Inflight, max_inflight := Max } = State) when length(Inflight) >= Max ->
pop_and_send(#{inflight := Inflight, max_inflight := Max } = State) ->
pop_and_send_loop(State, Max - length(Inflight)).
pop_and_send_loop(State, 0) ->
?tp(debug, inflight_full, #{}),
{ok, State};
pop_and_send(#{replayq := Q, connect_module := Module} = State) ->
pop_and_send_loop(#{replayq := Q, connect_module := Module} = State, N) ->
case replayq:is_empty(Q) of
true ->
?tp(debug, replayq_drained, #{}),
{ok, State};
false ->
BatchSize = case Module of
@ -492,7 +499,10 @@ pop_and_send(#{replayq := Q, connect_module := Module} = State) ->
end,
Opts = #{count_limit => BatchSize, bytes_limit => 999999999},
{Q1, QAckRef, Batch} = replayq:pop(Q, Opts),
do_send(State#{replayq := Q1}, QAckRef, Batch)
case do_send(State#{replayq := Q1}, QAckRef, Batch) of
{ok, NewState} -> pop_and_send_loop(NewState, N - 1);
{error, NewState} -> {error, NewState}
end
end.
%% Assert non-empty batch because we have a is_empty check earlier.
@ -500,36 +510,65 @@ do_send(#{inflight := Inflight,
connect_module := Module,
connection := Connection,
mountpoint := Mountpoint,
if_record_metrics := IfRecordMetrics} = State, QAckRef, Batch) ->
if_record_metrics := IfRecordMetrics} = State, QAckRef, [_ | _] = Batch) ->
ExportMsg = fun(Message) ->
bridges_metrics_inc(IfRecordMetrics, 'bridge.mqtt.message_sent'),
emqx_bridge_msg:to_export(Module, Mountpoint, Message)
end,
case Module:send(Connection, [ExportMsg(M) || M <- Batch]) of
{ok, Ref} ->
{ok, Refs} ->
{ok, State#{inflight := Inflight ++ [#{q_ack_ref => QAckRef,
send_ack_ref => Ref,
send_ack_ref => map_set(Refs),
batch => Batch}]}};
{error, Reason} ->
?LOG(info, "Batch produce failed~p", [Reason]),
?LOG(info, "mqtt_bridge_produce_failed ~p", [Reason]),
{error, State}
end.
%% map as set, ack-reference -> 1
map_set(Ref) when is_reference(Ref) ->
%% QoS-0 or RPC call returns a reference
map_set([Ref]);
map_set(List) ->
map_set(List, #{}).
do_ack(#{inflight := []} = State, Ref) ->
?LOG(error, "Can't be found from the inflight:~p", [Ref]),
{ok, State};
map_set([], Set) -> Set;
map_set([H | T], Set) -> map_set(T, Set#{H => 1}).
do_ack(#{inflight := [#{send_ack_ref := Ref,
q_ack_ref := QAckRef}| Rest], replayq := Q} = State, Ref) ->
ok = replayq:ack(Q, QAckRef),
{ok, State#{inflight => Rest}};
handle_batch_ack(#{inflight := Inflight0, replayq := Q} = State, Ref) ->
Inflight1 = do_ack(Inflight0, Ref),
Inflight = drop_acked_batches(Q, Inflight1),
State#{inflight := Inflight}.
do_ack(#{inflight := [#{q_ack_ref := QAckRef,
batch := Batch}| Rest], replayq := Q} = State, Ref) ->
ok = replayq:ack(Q, QAckRef),
NewQ = replayq:append(Q, Batch),
do_ack(State#{replayq => NewQ, inflight => Rest}, Ref).
do_ack([], Ref) ->
?LOG(debug, "stale_batch_ack_reference ~p", [Ref]),
[];
do_ack([#{send_ack_ref := Refs} = First | Rest], Ref) ->
case maps:is_key(Ref, Refs) of
true ->
NewRefs = maps:without([Ref], Refs),
[First#{send_ack_ref := NewRefs} | Rest];
false ->
[First | do_ack(Rest, Ref)]
end.
%% Drop the consecutive header of the inflight list having empty send_ack_ref
drop_acked_batches(_Q, []) ->
?tp(debug, inflight_drained, #{}),
[];
drop_acked_batches(Q, [#{send_ack_ref := Refs,
q_ack_ref := QAckRef} | Rest] = All) ->
case maps:size(Refs) of
0 ->
%% all messages are acked by bridge target
%% now it's safe to ack replayq (delete from disk)
ok = replayq:ack(Q, QAckRef),
%% continue to check more sent batches
drop_acked_batches(Q, Rest);
_ ->
%% the head (oldest) inflight batch is not acked, keep waiting
All
end.
subscribe_local_topics(Topics, Name) ->
lists:foreach(fun(Topic) -> subscribe_local_topic(Topic, Name) end, Topics).

View File

@ -0,0 +1,40 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_bridge_stub_conn).
-behaviour(emqx_bridge_connect).
%% behaviour callbacks
-export([ start/1
, send/2
, stop/1
]).
-type ack_ref() :: emqx_bridge_worker:ack_ref().
-type batch() :: emqx_bridge_worker:batch().
start(Cfg) ->
{ok, Cfg}.
stop(_) -> ok.
%% @doc Callback for `emqx_bridge_connect' behaviour
-spec send(_, batch()) -> {ok, ack_ref()} | {error, any()}.
send(#{stub_pid := Pid}, Batch) ->
Ref = make_ref(),
Pid ! {stub_message, self(), Ref, Batch},
{ok, Ref}.

View File

@ -16,20 +16,19 @@
-module(emqx_bridge_worker_SUITE).
-export([ all/0
, init_per_suite/1
, end_per_suite/1]).
-export([ t_rpc/1
, t_mqtt/1
, t_mngr/1]).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(wait(For, Timeout), emqx_ct_helpers:wait_for(?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)).
-define(SNK_WAIT(WHAT), ?assertMatch({ok, _}, ?block_until(#{?snk_kind := WHAT}, 2000, 1000))).
receive_messages(Count) ->
receive_messages(Count, []).
@ -45,10 +44,15 @@ receive_messages(Count, Msgs) ->
Msgs
end.
all() -> [ t_rpc
, t_mqtt
, t_mngr
].
all() ->
lists:filtermap(
fun({FunName, _Arity}) ->
case atom_to_list(FunName) of
"t_" ++ _ -> {true, FunName};
_ -> false
end
end,
?MODULE:module_info(exports)).
init_per_suite(Config) ->
case node() of
@ -56,12 +60,19 @@ init_per_suite(Config) ->
_ -> ok
end,
ok = application:set_env(gen_rpc, tcp_client_num, 1),
emqx_ct_helpers:start_apps([emqx_bridge_mqtt]),
emqx_ct_helpers:start_apps([emqx_modules, emqx_bridge_mqtt]),
emqx_logger:set_log_level(error),
[{log_level, error} | Config].
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([emqx_bridge_mqtt]).
emqx_ct_helpers:stop_apps([emqx_bridge_mqtt, emqx_modules]).
init_per_testcase(_TestCase, Config) ->
ok = snabbkaffe:start_trace(),
Config.
end_per_testcase(_TestCase, _Config) ->
ok = snabbkaffe:stop().
t_mngr(Config) when is_list(Config) ->
Subs = [{<<"a">>, 1}, {<<"b">>, 2}],
@ -174,3 +185,103 @@ t_mqtt(Config) when is_list(Config) ->
after
ok = emqx_bridge_worker:stop(Pid)
end.
t_stub_normal(Config) when is_list(Config) ->
Cfg = #{forwards => [<<"t_stub_normal/#">>],
connect_module => emqx_bridge_stub_conn,
forward_mountpoint => <<"forwarded">>,
start_type => auto,
stub_pid => self()
},
{ok, Pid} = emqx_bridge_worker:start_link(?FUNCTION_NAME, Cfg),
ClientId = <<"ClientId">>,
try
{ok, ConnPid} = emqtt:start_link([{clientid, ClientId}]),
{ok, _} = emqtt:connect(ConnPid),
{ok, _PacketId} = emqtt:publish(ConnPid, <<"t_stub_normal/one">>, <<"hello">>, ?QOS_1),
receive
{stub_message, WorkerPid, BatchRef, _Batch} ->
WorkerPid ! {batch_ack, BatchRef},
ok
end,
?SNK_WAIT(inflight_drained),
?SNK_WAIT(replayq_drained),
emqtt:disconnect(ConnPid)
after
ok = emqx_bridge_worker:stop(Pid)
end.
t_stub_overflow(Config) when is_list(Config) ->
Topic = <<"t_stub_overflow/one">>,
MaxInflight = 20,
Cfg = #{forwards => [Topic],
connect_module => emqx_bridge_stub_conn,
forward_mountpoint => <<"forwarded">>,
start_type => auto,
stub_pid => self(),
max_inflight => MaxInflight
},
{ok, Worker} = emqx_bridge_worker:start_link(?FUNCTION_NAME, Cfg),
ClientId = <<"ClientId">>,
try
{ok, ConnPid} = emqtt:start_link([{clientid, ClientId}]),
{ok, _} = emqtt:connect(ConnPid),
lists:foreach(
fun(I) ->
Data = integer_to_binary(I),
_ = emqtt:publish(ConnPid, Topic, Data, ?QOS_1)
end, lists:seq(1, MaxInflight * 2)),
?SNK_WAIT(inflight_full),
Acks = stub_receive(MaxInflight),
lists:foreach(fun({Pid, Ref}) -> Pid ! {batch_ack, Ref} end, Acks),
Acks2 = stub_receive(MaxInflight),
lists:foreach(fun({Pid, Ref}) -> Pid ! {batch_ack, Ref} end, Acks2),
?SNK_WAIT(inflight_drained),
?SNK_WAIT(replayq_drained),
emqtt:disconnect(ConnPid)
after
ok = emqx_bridge_worker:stop(Worker)
end.
t_stub_random_order(Config) when is_list(Config) ->
Topic = <<"t_stub_random_order/a">>,
MaxInflight = 10,
Cfg = #{forwards => [Topic],
connect_module => emqx_bridge_stub_conn,
forward_mountpoint => <<"forwarded">>,
start_type => auto,
stub_pid => self(),
max_inflight => MaxInflight
},
{ok, Worker} = emqx_bridge_worker:start_link(?FUNCTION_NAME, Cfg),
ClientId = <<"ClientId">>,
try
{ok, ConnPid} = emqtt:start_link([{clientid, ClientId}]),
{ok, _} = emqtt:connect(ConnPid),
lists:foreach(
fun(I) ->
Data = integer_to_binary(I),
_ = emqtt:publish(ConnPid, Topic, Data, ?QOS_1)
end, lists:seq(1, MaxInflight)),
Acks = stub_receive(MaxInflight),
lists:foreach(fun({Pid, Ref}) -> Pid ! {batch_ack, Ref} end,
lists:reverse(Acks)),
?SNK_WAIT(inflight_drained),
?SNK_WAIT(replayq_drained),
emqtt:disconnect(ConnPid)
after
ok = emqx_bridge_worker:stop(Worker)
end.
stub_receive(N) ->
stub_receive(N, []).
stub_receive(0, Acc) -> lists:reverse(Acc);
stub_receive(N, Acc) ->
receive
{stub_message, WorkerPid, BatchRef, _Batch} ->
stub_receive(N - 1, [{WorkerPid, BatchRef} | Acc])
after
5000 ->
lists:reverse(Acc)
end.

View File

@ -126,7 +126,7 @@ value(Value, ResourceId, ObjDefinition) ->
Value; % keep binary type since it is same as a string for jsx
"Integer" ->
Size = byte_size(Value)*8,
<<IntResult:Size>> = Value,
<<IntResult:Size/signed>> = Value,
IntResult;
"Float" ->
Size = byte_size(Value)*8,
@ -365,7 +365,7 @@ encode_int(Int) when Int >= 0 ->
binary:encode_unsigned(Int);
encode_int(Int) when Int < 0 ->
Size = byte_size_of_signed(-Int) * 8,
<<Int:Size>>.
<<Int:Size/signed>>.
byte_size_of_signed(UInt) ->
byte_size_of_signed(UInt, 0).

View File

@ -26,7 +26,7 @@
-rest_api(#{name => clean_acl_cache_node,
method => 'DELETE',
path => "/:atom:node/acl-cache",
path => "nodes/:atom:node/acl-cache",
func => clean_node,
descr => "Clean acl cache on specific node"}).

View File

@ -44,7 +44,7 @@
, import_blacklist/1
, import_applications/1
, import_users/1
, import_auth_clientid/1 %% BACKW: 4.1.x
, import_auth_clientid/2 %% BACKW: 4.1.x
, import_auth_username/1 %% BACKW: 4.1.x
, import_auth_mnesia/2
, import_acl_mnesia/2
@ -240,6 +240,7 @@ import_resources_and_rules(Resources, Rules, _FromVersion) ->
import_resources(Resources),
import_rules(Rules).
-else.
import_resources_and_rules(Resources, Rules, FromVersion)
when FromVersion =:= "4.0" orelse
FromVersion =:= "4.1" orelse
@ -249,7 +250,6 @@ import_resources_and_rules(Resources, Rules, FromVersion)
NActions = apply_new_config(Actions, Configs),
import_rule(Rule#{<<"actions">> := NActions})
end, Rules);
import_resources_and_rules(Resources, Rules, _FromVersion) ->
import_resources(Resources),
import_rules(Rules).
@ -264,42 +264,39 @@ compatible_version(#{<<"id">> := ID,
<<"pool_size">> := PoolSize,
<<"request_timeout">> := RequestTimeout,
<<"url">> := URL}} = Resource, Acc) ->
CovertFun = fun(Int) ->
list_to_binary(integer_to_list(Int) ++ "s")
end,
Cfg = make_new_config(#{<<"pool_size">> => PoolSize,
<<"connect_timeout">> => CovertFun(ConnectTimeout),
<<"request_timeout">> => CovertFun(RequestTimeout),
<<"url">> => URL}),
{ok, _Resource} = import_resource(Resource#{<<"config">> := Cfg}),
NHeaders = maps:put(<<"content-type">>, ContentType, covert_empty_headers(Headers)),
[{ID, #{headers => NHeaders, method => Method}} | Acc];
CovertFun = fun(Int) ->
list_to_binary(integer_to_list(Int) ++ "s")
end,
Cfg = make_new_config(#{<<"pool_size">> => PoolSize,
<<"connect_timeout">> => CovertFun(ConnectTimeout),
<<"request_timeout">> => CovertFun(RequestTimeout),
<<"url">> => URL}),
{ok, _Resource} = import_resource(Resource#{<<"config">> := Cfg}),
NHeaders = maps:put(<<"content-type">>, ContentType, covert_empty_headers(Headers)),
[{ID, #{headers => NHeaders, method => Method}} | Acc];
% 4.2.0
compatible_version(#{<<"id">> := ID,
<<"type">> := <<"web_hook">>,
<<"config">> := #{<<"headers">> := Headers,
<<"method">> := Method,%% 4.2.0 Different here
<<"url">> := URL}} = Resource, Acc) ->
Cfg = make_new_config(#{<<"url">> => URL}),
{ok, _Resource} = import_resource(Resource#{<<"config">> := Cfg}),
NHeaders = maps:put(<<"content-type">>, <<"application/json">> , covert_empty_headers(Headers)),
[{ID, #{headers => NHeaders, method => Method}} | Acc];
<<"type">> := <<"web_hook">>,
<<"config">> := #{<<"headers">> := Headers,
<<"method">> := Method,%% 4.2.0 Different here
<<"url">> := URL}} = Resource, Acc) ->
Cfg = make_new_config(#{<<"url">> => URL}),
{ok, _Resource} = import_resource(Resource#{<<"config">> := Cfg}),
NHeaders = maps:put(<<"content-type">>, <<"application/json">> , covert_empty_headers(Headers)),
[{ID, #{headers => NHeaders, method => Method}} | Acc];
%% bridge mqtt
%% 4.2.0 - 4.2.5 bridge_mqtt, ssl enabled from on/off to true/false
compatible_version(#{<<"type">> := <<"bridge_mqtt">>,
<<"id">> := ID, %% begin 4.2.0.
<<"config">> := #{<<"ssl">> := Ssl} = Config} = Resource, Acc) ->
F = fun(B) ->
case B of
<<"on">> -> true;
<<"off">> -> false;
Other -> Other
end
end,
NewConfig = Config#{<<"ssl">> := F(Ssl)},
{ok, _Resource} = import_resource(Resource#{<<"config">> := NewConfig}),
[{ID, NewConfig} | Acc];
NewConfig = Config#{<<"ssl">> := flag_to_boolean(Ssl),
<<"pool_size">> => case maps:get(<<"pool_size">>, Config, undefined) of %% 4.0.x, compatible `pool_size`
undefined -> 8;
PoolSize -> PoolSize
end},
{ok, _Resource} = import_resource(Resource#{<<"config">> := NewConfig}),
[{ID, NewConfig} | Acc];
% 4.2.3, add :content_type
compatible_version(#{<<"id">> := ID,
@ -308,14 +305,14 @@ compatible_version(#{<<"id">> := ID,
<<"content_type">> := ContentType,%% 4.2.3 Different here
<<"method">> := Method,
<<"url">> := URL}} = Resource, Acc) ->
Cfg = make_new_config(#{<<"url">> => URL}),
{ok, _Resource} = import_resource(Resource#{<<"config">> := Cfg}),
NHeaders = maps:put(<<"content-type">>, ContentType, covert_empty_headers(Headers)),
[{ID, #{headers => NHeaders, method => Method}} | Acc];
Cfg = make_new_config(#{<<"url">> => URL}),
{ok, _Resource} = import_resource(Resource#{<<"config">> := Cfg}),
NHeaders = maps:put(<<"content-type">>, ContentType, covert_empty_headers(Headers)),
[{ID, #{headers => NHeaders, method => Method}} | Acc];
% normal version
compatible_version(Resource, Acc) ->
{ok, _Resource} = import_resource(Resource),
Acc.
{ok, _Resource} = import_resource(Resource),
Acc.
make_new_config(Cfg) ->
Config = #{<<"pool_size">> => 8,
@ -408,12 +405,18 @@ import_users(Users) ->
emqx_dashboard_admin:force_add_user(Username, NPassword, Tags)
end, Users).
import_auth_clientid(Lists) ->
import_auth_clientid(Lists, Version) ->
case ets:info(emqx_user) of
undefined -> ok;
_ ->
lists:foreach(fun(#{<<"clientid">> := Clientid, <<"password">> := Password}) ->
mnesia:dirty_write({emqx_user, {clientid, Clientid}, base64:decode(Password), erlang:system_time(millisecond)})
lists:foreach(fun(#{<<"clientid">> := Clientid, <<"password">> := Password0}) ->
Password = case Version of
"4.1" -> base64:decode(Password0);
_ -> ensure_binary(Password0)
end,
mnesia:dirty_write({emqx_user, {clientid, Clientid}
, Password
, erlang:system_time(millisecond)})
end, Lists)
end.
@ -487,8 +490,8 @@ do_import_auth_mnesia(Auths) ->
_ ->
lists:foreach(fun(#{<<"login">> := Login,
<<"type">> := Type,
<<"password">> := Password,
<<"created_at">> := CreatedAt }) ->
<<"password">> := Password } = Map) ->
CreatedAt = maps:get(<<"created_at">>, Map, erlang:system_time(millisecond)),
mnesia:dirty_write({emqx_user, {any_to_atom(Type), Login}, base64:decode(Password), CreatedAt})
end, Auths)
end.
@ -514,15 +517,15 @@ do_import_acl_mnesia(Acls) ->
undefined -> ok;
_ ->
lists:foreach(fun(Map = #{<<"action">> := Action,
<<"access">> := Access,
<<"created_at">> := CreatedAt}) ->
Filter = case maps:get(<<"type_value">>, Map, undefined) of
<<"access">> := Access}) ->
Topic = maps:get(<<"topic">>, Map),
Login = case maps:get(<<"type_value">>, Map, undefined) of
undefined ->
{any_to_atom(maps:get(<<"type">>, Map)), maps:get(<<"topic">>, Map)};
all;
Value ->
{{any_to_atom(maps:get(<<"type">>, Map)), Value}, maps:get(<<"topic">>, Map)}
{any_to_atom(maps:get(<<"type">>, Map)), Value}
end,
mnesia:dirty_write({emqx_acl ,Filter, any_to_atom(Action), any_to_atom(Access), CreatedAt})
emqx_acl_mnesia_cli:add_acl(Login, Topic, any_to_atom(Action), any_to_atom(Access))
end, Acls)
end.
@ -645,7 +648,7 @@ do_import_data(Data, Version) ->
import_blacklist(maps:get(<<"blacklist">>, Data, [])),
import_applications(maps:get(<<"apps">>, Data, [])),
import_users(maps:get(<<"users">>, Data, [])),
import_auth_clientid(maps:get(<<"auth_clientid">>, Data, [])),
import_auth_clientid(maps:get(<<"auth_clientid">>, Data, []), Version),
import_auth_username(maps:get(<<"auth_username">>, Data, [])),
import_auth_mnesia(maps:get(<<"auth_mnesia">>, Data, []), Version),
import_acl_mnesia(maps:get(<<"acl_mnesia">>, Data, []), Version).
@ -661,19 +664,23 @@ do_import_extra_data(_Data, _Version) -> ok.
-endif.
-ifndef(EMQX_ENTERPRISE).
covert_empty_headers(Headers) ->
case Headers of
[] -> #{};
Other -> Other
end.
covert_empty_headers([]) -> #{};
covert_empty_headers(Other) -> Other.
flag_to_boolean(<<"on">>) -> true;
flag_to_boolean(<<"off">>) -> false;
flag_to_boolean(Other) -> Other.
-endif.
read_global_auth_type(Data, Version) when Version =:= "4.0" orelse
Version =:= "4.1" orelse
Version =:= "4.2" ->
ct:print("|>=> :~p~n", [Data]),
case Data of
#{<<"auth.mnesia.as">> := <<"username">>} -> application:set_env(emqx_auth_mnesia, as, username);
#{<<"auth.mnesia.as">> := <<"clientid">>} -> application:set_env(emqx_auth_mnesia, as, clientid);
#{<<"auth.mnesia.as">> := <<"username">>} ->
application:set_env(emqx_auth_mnesia, as, username);
#{<<"auth.mnesia.as">> := <<"clientid">>} ->
application:set_env(emqx_auth_mnesia, as, clientid);
_ ->
logger:error("While importing data from EMQX versions prior to 4.3 "
"it is necessary to specify the value of \"auth.mnesia.as\" parameter "
@ -691,3 +698,8 @@ read_global_auth_type(_Data, _Version) ->
get_old_type() ->
{ok, Type} = application:get_env(emqx_auth_mnesia, as),
Type.
ensure_binary(A) when is_binary(A) ->
A;
ensure_binary(A) ->
list_to_binary(A).

View File

@ -26,13 +26,26 @@
-include_lib("emqx_auth_mnesia/include/emqx_auth_mnesia.hrl").
-ifdef(EMQX_ENTERPRISE).
-define(VERSIONS, ["e4.1", "e4.2"]).
-else.
-define(VERSIONS, ["v4.1", "v4.2"]).
-endif.
matrix() ->
[ {username, "e4.2.9"}
, {clientid, "e4.1.1"}
, {username, "e4.1.1"}
].
all() ->
[{group, Id} || {Id, _, _} <- groups()].
[t_matrix].
-else. %% ! EMQX_ENTERPRISE
matrix() ->
[{ImportAs, Version} || ImportAs <- [clientid, username]
, Version <- ["v4.2.9", "v4.1.5"]].
all() ->
[t_matrix, t_import_4_0].
-endif. %% EMQX_ENTERPRISE
groups() ->
[{username, [], cases()}, {clientid, [], cases()}].
@ -50,14 +63,6 @@ end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([emqx_modules, emqx_management, emqx_dashboard, emqx_management, emqx_auth_mnesia]),
ekka_mnesia:ensure_stopped().
init_per_group(username, Config) ->
[{cred_type, username} | Config];
init_per_group(clientid, Config) ->
[{cred_type, clientid} | Config].
end_per_group(_, Config) ->
Config.
init_per_testcase(_, Config) ->
Config.
@ -66,17 +71,27 @@ end_per_testcase(_, _Config) ->
mnesia:clear_table(emqx_user),
ok.
t_import(Config) ->
test_import(Config, ?VERSIONS).
t_matrix(Config) ->
[begin
ct:pal("Testing import of ~p from ~p", [ImportAs, FromVersion]),
do_import(Config, ImportAs, FromVersion),
test_clientid_import(),
ct:pal("ok")
end
|| {ImportAs, FromVersion} <- matrix()].
test_import(Config, [V | Versions]) ->
do_import(Config, V),
test_import(Config, Versions);
test_import(_Config, []) -> ok.
%% This version is special, since it doesn't have mnesia ACL plugin
t_import_4_0(Config) ->
mnesia:clear_table(emqx_acl),
mnesia:clear_table(emqx_user),
Filename = filename:join(proplists:get_value(data_dir, Config), "v4.0.7.json"),
Overrides = emqx_json:encode(#{<<"auth.mnesia.as">> => atom_to_binary(clientid)}),
?assertMatch(ok, emqx_mgmt_data_backup:import(Filename, Overrides)),
timer:sleep(100),
test_clientid_import().
do_import(Config, V) ->
do_import(Config, Type, V) ->
File = V ++ ".json",
Type = proplists:get_value(cred_type, Config),
mnesia:clear_table(emqx_acl),
mnesia:clear_table(emqx_user),
Filename = filename:join(proplists:get_value(data_dir, Config), File),
@ -95,12 +110,20 @@ do_import(Config, V) ->
access = allow
}],
lists:sort(Records)),
?assertMatch([#emqx_user{
login = {Type, <<"emqx_c">>}
}], ets:tab2list(emqx_user)),
?assertMatch([_, _], ets:tab2list(emqx_user)),
?assertMatch([_], ets:lookup(emqx_user, {Type, <<"emqx_c">>})),
Req = #{clientid => <<"blah">>}
#{Type => <<"emqx_c">>,
password => "emqx_p"
password => <<"emqx_p">>
},
?assertMatch({stop, #{auth_result := success}},
emqx_auth_mnesia:check(Req, #{}, #{hash_type => sha256})).
test_clientid_import() ->
[#emqx_user{password = _Pass}] = ets:lookup(emqx_user, {clientid, <<"emqx_clientid">>}),
%% Req = #{clientid => <<"emqx_clientid">>,
%% password => <<"emqx_p">>
%% },
%% ?assertMatch({stop, #{auth_result := success}},
%% emqx_auth_mnesia:check(Req, #{}, #{hash_type => sha256})),
ok.

View File

@ -0,0 +1,122 @@
#!/bin/bash
set -eux pipefail
# Helper script for creating data export files
container() {
version="${1}"
if [ -z ${2+x} ]; then
ee=""
else
ee="-ee"
fi
container="emqx/emqx${ee}:${version}"
docker rm -f emqx || true
docker run "$container" true # Make sure the image is cached locally
docker run --rm -e EMQX_LOADED_PLUGINS="emqx_auth_mnesia emqx_auth_clientid emqx_management" \
--name emqx -p 8081:8081 "$container" emqx foreground &
sleep 7
}
create_acls () {
url="${1}"
curl -f -v "http://localhost:8081/$url" -u admin:public -d@- <<EOF
[
{
"login":"emqx_c",
"topic":"Topic/A",
"action":"pub",
"allow": true
},
{
"login":"emqx_c",
"topic":"Topic/A",
"action":"sub",
"allow": true
}
]
EOF
}
create_user () {
url="${1}"
curl -f -v "http://localhost:8081/api/v4/$url" -u admin:public -d@- <<EOF
{
"login": "emqx_c",
"password": "emqx_p",
"is_superuser": true
}
EOF
}
export_data() {
filename="${1}"
docker exec emqx emqx_ctl data export
docker exec emqx sh -c 'cat data/*.json' | jq > "$filename.json"
cat "${filename}.json"
}
collect_4_2 () {
container "4.2.9"
create_acls "api/v4/mqtt_acl"
create_user mqtt_user
# Add clientid
docker exec emqx emqx_ctl clientid add emqx_clientid emqx_p
export_data "v4.2.9"
}
collect_e4_2 () {
container "4.2.5" "ee"
# Add ACLs:
docker exec emqx emqx_ctl acl add username emqx_c Topic/A pubsub allow
# Create users
docker exec emqx emqx_ctl user add emqx_c emqx_p
# Add clientid
docker exec emqx emqx_ctl clientid add emqx_clientid emqx_p
export_data "e4.2.9"
}
collect_e4_1 () {
container "4.1.1" "ee"
# Add ACLs:
create_acls "api/v4/emqx_acl"
# Create users
create_user "auth_user"
# Add clientid
docker exec emqx emqx_ctl clientid add emqx_clientid emqx_p
export_data "e4.1.1"
}
collect_4_1 () {
container "v4.1.5"
create_acls "api/v4/emqx_acl"
create_user auth_user
# Add clientid
docker exec emqx emqx_ctl clientid add emqx_clientid emqx_p
export_data "v4.1.5"
}
collect_4_0 () {
container "v4.0.7"
# Add clientid
docker exec emqx emqx_ctl clientid add emqx_clientid emqx_p
export_data "v4.0.7"
}
collect_4_0
collect_4_1
collect_4_2
collect_e4_2
collect_e4_1
docker kill emqx

View File

@ -0,0 +1,28 @@
{
"version": "4.0",
"users": [],
"schemas": [],
"rules": [],
"resources": [],
"date": "2021-04-07 14:28:49",
"blacklist": [],
"auth_username": [],
"auth_mnesia": [],
"auth_clientid": [
{
"password": "<22><>Pd56c0fcdcd7636dcf8ed1ea48cd3d58acab74030157551f7f7f8684804b9239e",
"clientid": "emqx_clientid"
}
],
"apps": [
{
"status": true,
"secret": "public",
"name": "Default",
"id": "admin",
"expired": "undefined",
"desc": "Application user"
}
],
"acl_mnesia": []
}

View File

@ -0,0 +1,53 @@
{
"version": "4.1",
"users": [
{
"username": "admin",
"tags": "administrator",
"password": "R0TpDmJtE/d5rIXAm6YY61RI0mg="
}
],
"schemas": [],
"rules": [],
"resources": [],
"date": "2021-04-07 14:28:58",
"blacklist": [],
"auth_username": [],
"auth_mnesia": [
{
"password": "Y2ViNWU5MTdmNzkzMGFlOGYwZGMzY2ViNDk2YTQyOGY3ZTY0NDczNmVlYmNhMzZhMmI4ZjZiYmFjNzU2MTcxYQ==",
"login": "emqx_c",
"is_superuser": true
}
],
"auth_clientid": [
{
"password": "MctXdjZkYzRhMDUwMTc4MDM0OWY4YTg1NTg4Y2ZlOThjYWIyMDk3M2UzNjgzYzYyZWYwOTAzMTk2N2E4OWVjZDk4Mjk=",
"clientid": "emqx_clientid"
}
],
"apps": [
{
"status": true,
"secret": "public",
"name": "Default",
"id": "admin",
"expired": "undefined",
"desc": "Application user"
}
],
"acl_mnesia": [
{
"topic": "Topic/A",
"login": "emqx_c",
"allow": true,
"action": "sub"
},
{
"topic": "Topic/A",
"login": "emqx_c",
"allow": true,
"action": "pub"
}
]
}

View File

@ -1,48 +0,0 @@
{
"acl_mnesia": [
{
"action": "sub",
"allow": true,
"login": "emqx_c",
"topic": "Topic/A"
},
{
"action": "pub",
"allow": true,
"login": "emqx_c",
"topic": "Topic/A"
}
],
"apps": [
{
"desc": "Application user",
"expired": "undefined",
"id": "admin",
"name": "Default",
"secret": "public",
"status": true
}
],
"auth_clientid": [],
"auth_mnesia": [
{
"is_superuser": false,
"login": "emqx_c",
"password": "Y2ViNWU5MTdmNzkzMGFlOGYwZGMzY2ViNDk2YTQyOGY3ZTY0NDczNmVlYmNhMzZhMmI4ZjZiYmFjNzU2MTcxYQ=="
}
],
"auth_username": [],
"blacklist": [],
"date": "2021-03-30 09:11:29",
"resources": [],
"rules": [],
"schemas": [],
"users": [
{
"password": "t89PhgOb15rSCdpxm7Obp7QGcyY=",
"tags": "administrator",
"username": "admin"
}
],
"version": "4.1"
}

View File

@ -0,0 +1,53 @@
{
"version": "4.2",
"date": "2021-04-07 14:29:08",
"rules": [],
"resources": [],
"blacklist": [],
"apps": [
{
"id": "admin",
"secret": "public",
"name": "Default",
"desc": "Application user",
"status": true,
"expired": "undefined"
}
],
"users": [
{
"username": "admin",
"password": "gCBXISkivpaeKetFcPSm+Eaxyxs=",
"tags": "administrator"
}
],
"auth_clientid": [
{
"clientid": "emqx_clientid",
"password": "z<>7d413fee461607065c161072f3707dc0a01bd1fc8476eb7be703a74a66701bb14"
}
],
"auth_username": [],
"auth_mnesia": [
{
"login": "emqx_c",
"password": "ceb5e917f7930ae8f0dc3ceb496a428f7e644736eebca36a2b8f6bbac756171a",
"is_superuser": true
}
],
"acl_mnesia": [
{
"login": "emqx_c",
"topic": "Topic/A",
"action": "sub",
"allow": true
},
{
"login": "emqx_c",
"topic": "Topic/A",
"action": "pub",
"allow": true
}
],
"schemas": []
}

View File

@ -1,53 +0,0 @@
{
"schemas": [],
"acl_mnesia": [
{
"allow": true,
"action": "sub",
"topic": "Topic/A",
"login": "emqx_c"
},
{
"allow": true,
"action": "pub",
"topic": "Topic/A",
"login": "emqx_c"
}
],
"auth_mnesia": [
{
"is_superuser": false,
"password": "ceb5e917f7930ae8f0dc3ceb496a428f7e644736eebca36a2b8f6bbac756171a",
"login": "emqx_c"
}
],
"auth_username": [],
"auth_clientid": [],
"users": [
{
"tags": "viewer",
"password": "oVqjR1wOi2u4DtsuXNctYt6+SKE=",
"username": "test"
},
{
"tags": "administrator",
"password": "9SO4rEEZ6rNwA4vAwp3cnXgQsAM=",
"username": "admin"
}
],
"apps": [
{
"expired": "undefined",
"status": true,
"desc": "Application user",
"name": "Default",
"secret": "public",
"id": "admin"
}
],
"blacklist": [],
"resources": [],
"rules": [],
"date": "2021-03-26 09:51:38",
"version": "4.2"
}

View File

@ -0,0 +1,118 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_bridge_mqtt_data_export_import_SUITE).
-include_lib("eunit/include/eunit.hrl").
-include_lib("emqx_rule_engine/include/rule_engine.hrl").
-compile([export_all, nowarn_export_all]).
%%--------------------------------------------------------------------
%% Setups
%%--------------------------------------------------------------------
all() ->
emqx_ct:all(?MODULE).
init_per_suite(Cfg) ->
ok = ekka_mnesia:start(),
ok = emqx_rule_registry:mnesia(boot),
ok = emqx_rule_engine:load_providers(),
emqx_ct_helpers:start_apps([emqx_web_hook,
emqx_bridge_mqtt,
emqx_rule_engine,
emqx_modules,
emqx_management,
emqx_dashboard]),
Cfg.
end_per_suite(Cfg) ->
emqx_ct_helpers:stop_apps([emqx_dashboard,
emqx_management,
emqx_modules,
emqx_rule_engine,
emqx_bridge_mqtt,
emqx_web_hook]),
Cfg.
get_data_path() ->
emqx_ct_helpers:deps_path(emqx_management, "test/emqx_bridge_mqtt_data_export_import_SUITE_data/").
%%--------------------------------------------------------------------
%% Cases
%%--------------------------------------------------------------------
handle_config(Config, 420, brigde) ->
?assertEqual(<<"off">>, maps:get(<<"ssl">>, Config));
handle_config(Config, 430, brigde) ->
?assertEqual(false, maps:get(<<"ssl">>, Config));
handle_config(Config, 420, rpc) ->
handle_config(Config, 430, rpc);
handle_config(Config, 409, rpc) ->
handle_config(Config, 420, rpc);
handle_config(Config, 415, rpc) ->
handle_config(Config, 420, rpc);
handle_config(Config, 409, brigde) ->
handle_config(Config, 420, brigde);
handle_config(Config, 415, brigde) ->
handle_config(Config, 420, brigde);
handle_config(Config, 430, rpc) ->
?assertEqual(<<"emqx@127.0.0.1">>, maps:get(<<"address">>, Config)),
?assertEqual(32, maps:get(<<"batch_size">>, Config)),
?assertEqual(<<"off">>, maps:get(<<"disk_cache">>, Config)),
?assertEqual(<<"bridge/emqx/${node}/">>, maps:get(<<"mountpoint">>, Config)),
?assertEqual(<<"30s">>, maps:get(<<"reconnect_interval">>, Config)),
?assertEqual(8, maps:get(<<"pool_size">>, Config));
handle_config(_, _, _) -> ok.
remove_resource(Id) ->
emqx_rule_registry:remove_resource(Id),
emqx_rule_registry:remove_resource_params(Id).
import(FilePath, Version) ->
Overrides = emqx_json:encode(#{<<"auth.mnesia.as">> => atom_to_binary(clientid)}),
ok = emqx_mgmt_data_backup:import(get_data_path() ++ "/" ++ FilePath, Overrides),
lists:foreach(fun(#resource{id = Id, config = Config} = _Resource) ->
case Id of
<<"brigde">> ->
handle_config(Config, Version, brigde),
remove_resource(Id);
<<"rpc">> ->
handle_config(Config, Version, rpc),
remove_resource(Id);
_ -> ok
end
end, emqx_rule_registry:get_resources()).
t_import420(_) ->
import("420.json", 420),
{ok, _} = emqx_mgmt_data_backup:export().
t_import430(_) ->
import("430.json", 430),
{ok, _} = emqx_mgmt_data_backup:export().
t_import409(_) ->
import("409.json", 409),
{ok, _} = emqx_mgmt_data_backup:export().
t_import415(_) ->
import("415.json", 415),
{ok, _} = emqx_mgmt_data_backup:export().

View File

@ -0,0 +1,68 @@
{
"version": "4.0",
"users": [
{
"username": "admin",
"tags": "administrator",
"password": "m/GtjNgri9GILklefVJH8BeTnNE="
}
],
"schemas": [],
"rules": [],
"resources": [
{
"type": "bridge_mqtt",
"id": "bridge",
"description": "bridge",
"created_at": null,
"config": {
"username": "user",
"ssl": "off",
"retry_interval": "20s",
"reconnect_interval": "30s",
"proto_ver": "mqttv4",
"password": "passwd",
"mountpoint": "bridge/emqx/${node}/",
"keyfile": "etc/certs/client-key.pem",
"keepalive": "60s",
"disk_cache": "off",
"clientid": "bridge_aws",
"ciphers": "ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384",
"certfile": "etc/certs/client-cert.pem",
"cacertfile": "etc/certs/cacert.pem",
"bridge_mode": true,
"address": "127.0.0.1:1883"
}
},
{
"type": "bridge_rpc",
"id": "rpc",
"description": "rpc",
"created_at": null,
"config": {
"reconnect_interval": "30s",
"pool_size": 8,
"mountpoint": "bridge/emqx/${node}/",
"disk_cache": "off",
"batch_size": 32,
"address": "emqx@127.0.0.1"
}
}
],
"date": "2021-03-30 13:38:39",
"blacklist": [],
"auth_username": [],
"auth_mnesia": [],
"auth_clientid": [],
"apps": [
{
"status": true,
"secret": "public",
"name": "Default",
"id": "admin",
"expired": "undefined",
"desc": "Application user"
}
],
"acl_mnesia": []
}

View File

@ -0,0 +1,70 @@
{
"version": "4.1",
"users": [
{
"username": "admin",
"tags": "administrator",
"password": "3W+nHOkCZLFspENkIvHKzCbHxHI="
}
],
"schemas": [],
"rules": [],
"resources": [
{
"type": "bridge_rpc",
"id": "rpc",
"description": "rpc",
"created_at": null,
"config": {
"reconnect_interval": "30s",
"pool_size": 8,
"mountpoint": "bridge/emqx/${node}/",
"disk_cache": "off",
"batch_size": 32,
"address": "emqx@127.0.0.1"
}
},
{
"type": "bridge_mqtt",
"id": "bridge",
"description": "bridge",
"created_at": null,
"config": {
"username": "user",
"ssl": "off",
"retry_interval": "20s",
"reconnect_interval": "30s",
"proto_ver": "mqttv4",
"pool_size": 8,
"password": "passwd",
"mountpoint": "bridge/emqx/${node}/",
"keyfile": "etc/certs/client-key.pem",
"keepalive": "60s",
"disk_cache": "off",
"clientid": "client",
"ciphers": "ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA",
"certfile": "etc/certs/client-cert.pem",
"cacertfile": "etc/certs/cacert.pem",
"bridge_mode": true,
"append": true,
"address": "127.0.0.1:1883"
}
}
],
"date": "2021-03-30 13:52:53",
"blacklist": [],
"auth_username": [],
"auth_mnesia": [],
"auth_clientid": [],
"apps": [
{
"status": true,
"secret": "public",
"name": "Default",
"id": "admin",
"expired": "undefined",
"desc": "Application user"
}
],
"acl_mnesia": []
}

View File

@ -0,0 +1,70 @@
{
"version": "4.2",
"date": "2021-03-23 23:22:30",
"rules": [],
"resources": [
{
"id": "rpc",
"type": "bridge_rpc",
"config": {
"address": "emqx@127.0.0.1",
"batch_size": 32,
"disk_cache": "off",
"mountpoint": "bridge/emqx/${node}/",
"pool_size": 8,
"reconnect_interval": "30s"
},
"created_at": null,
"description": "rpc"
},
{
"id": "bridge",
"type": "bridge_mqtt",
"config": {
"address": "127.0.0.1:1883",
"append": true,
"bridge_mode": false,
"cacertfile": "etc/certs/cacert.pem",
"certfile": "etc/certs/client-cert.pem",
"ciphers": "ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA",
"clientid": "client",
"disk_cache": "off",
"keepalive": "60s",
"keyfile": "etc/certs/client-key.pem",
"mountpoint": "bridge/aws/${node}/",
"password": "",
"pool_size": 8,
"proto_ver": "mqttv4",
"reconnect_interval": "30s",
"retry_interval": "20s",
"ssl": "off",
"username": ""
},
"created_at": null,
"description": "bridge"
}
],
"blacklist": [],
"apps": [
{
"id": "admin",
"secret": "public",
"name": "Default",
"desc": "Application user",
"status": true,
"expired": "undefined"
}
],
"users": [
{
"username": "admin",
"password": "mOTRLWt85F7GW+CSiBuRUZiRANw=",
"tags": "administrator"
}
],
"auth_clientid": [],
"auth_username": [],
"auth_mnesia": [],
"acl_mnesia": [],
"schemas": []
}

View File

@ -0,0 +1,76 @@
{
"version": "4.3",
"rules": [],
"resources": [
{
"id": "brigde",
"type": "bridge_mqtt",
"config": {
"address": "127.0.0.1:1883",
"append": true,
"bridge_mode": false,
"cacertfile": {
"filename": "etc/certs/cacert.pem",
"file": ""
},
"certfile": {
"filename": "etc/certs/client-cert.pem",
"file": ""
},
"ciphers": "ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA",
"clientid": "client",
"disk_cache": "off",
"keepalive": "60s",
"keyfile": {
"filename": "etc/certs/client-key.pem",
"file": ""
},
"mountpoint": "bridge/aws/${node}/",
"password": "",
"pool_size": 8,
"proto_ver": "mqttv4",
"reconnect_interval": "30s",
"retry_interval": "20s",
"ssl": false,
"username": ""
},
"created_at": 1616635569139,
"description": "brigde"
},
{
"id": "rpc",
"type": "bridge_rpc",
"config": {
"address": "emqx@127.0.0.1",
"batch_size": 32,
"disk_cache": "off",
"mountpoint": "bridge/emqx/${node}/",
"pool_size": 8,
"reconnect_interval": "30s"
},
"created_at": 1616635583552,
"description": "rpc"
}
],
"blacklist": [],
"apps": [
{
"id": "admin",
"secret": "public",
"name": "Default",
"desc": "Application user",
"status": true,
"expired": "undefined"
}
],
"users": [
{
"username": "admin",
"password": "q8v7hISIMz+iKn/ZuAaogvAxKbA=",
"tags": "administrator"
}
],
"auth_mnesia": [],
"acl_mnesia": [],
"date": "2021-03-25 09:26:34"
}

View File

@ -68,7 +68,7 @@ end_per_suite(_Config) ->
init_per_testcase(data, Config) ->
ok = emqx_dashboard_admin:mnesia(boot),
application:ensure_all_started(emqx_dahboard),
application:ensure_all_started(emqx_dashboard),
ok = emqx_rule_registry:mnesia(boot),
application:ensure_all_started(emqx_rule_engine),
Config;

View File

@ -0,0 +1,131 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_webhook_data_export_import_SUITE).
-include_lib("eunit/include/eunit.hrl").
-include_lib("emqx_rule_engine/include/rule_engine.hrl").
-compile([export_all, nowarn_export_all]).
%%--------------------------------------------------------------------
%% Setups
%%--------------------------------------------------------------------
all() ->
emqx_ct:all(?MODULE).
init_per_suite(Cfg) ->
emqx_ct_helpers:start_apps([emqx_web_hook,
emqx_bridge_mqtt,
emqx_rule_engine,
emqx_modules,
emqx_management,
emqx_dashboard]),
ok = ekka_mnesia:start(),
ok = emqx_rule_registry:mnesia(boot),
ok = emqx_rule_engine:load_providers(),
Cfg.
end_per_suite(Cfg) ->
emqx_ct_helpers:stop_apps([emqx_dashboard,
emqx_management,
emqx_modules,
emqx_rule_engine,
emqx_bridge_mqtt,
emqx_web_hook]),
Cfg.
get_data_path() ->
emqx_ct_helpers:deps_path(emqx_management, "test/emqx_webhook_data_export_import_SUITE_data/").
%%--------------------------------------------------------------------
%% Cases
%%--------------------------------------------------------------------
handle_config(Config, 409) ->
handle_config(Config, 422);
handle_config(Config, 415) ->
handle_config(Config, 422);
handle_config(Config, 422) ->
?assertEqual(<<"http://www.emqx.io">>, maps:get(<<"url">>, Config)),
?assertEqual(<<"POST">>, maps:get(<<"method">>, Config)),
?assertEqual(#{"k" => "v"}, maps:get(<<"headers">>, Config));
handle_config(Config, 423) ->
?assertEqual(<<"http://www.emqx.io">>, maps:get(<<"url">>, Config)),
?assertEqual(<<"POST">>, maps:get(<<"method">>, Config)),
?assertEqual("application/json", maps:get(<<"content_type">>, Config)),
?assertEqual(#{"k" => "v"}, maps:get(<<"headers">>, Config));
handle_config(Config, 425) ->
?assertEqual(<<"http://www.emqx.io">>, maps:get(<<"url">>, Config)),
?assertEqual(<<"POST">>, maps:get(<<"method">>, Config)),
?assertEqual(#{"k" => "v"}, maps:get(<<"headers">>, Config)),
?assertEqual(5, maps:get(<<"connect_timeout">>, Config)),
?assertEqual(5, maps:get(<<"request_timeout">>, Config)),
?assertEqual(8, maps:get(<<"pool_size">>, Config));
handle_config(Config, 430) ->
?assertEqual(<<"http://www.emqx.io">>, maps:get(<<"url">>, Config)),
?assertEqual(<<"POST">>, maps:get(<<"method">>, Config)),
?assertEqual(#{"k" => "v"}, maps:get(<<"headers">>, Config)),
?assertEqual("5s", maps:get(<<"connect_timeout">>, Config)),
?assertEqual("5s", maps:get(<<"request_timeout">>, Config)),
?assertEqual(false, maps:get(<<"verify">>, Config)),
?assertEqual(true, is_map(maps:get(<<"cacertfile">>, Config))),
?assertEqual(true, is_map(maps:get(<<"certfile">>, Config))),
?assertEqual(true, is_map(maps:get(<<"keyfile">>, Config))),
?assertEqual(8, maps:get(<<"pool_size">>, Config));
handle_config(_, _) -> ok.
remove_resource(Id) ->
emqx_rule_registry:remove_resource(Id),
emqx_rule_registry:remove_resource_params(Id).
import(FilePath, Version) ->
Overrides = emqx_json:encode(#{<<"auth.mnesia.as">> => atom_to_binary(clientid)}),
ok = emqx_mgmt_data_backup:import(get_data_path() ++ "/" ++ FilePath, Overrides),
lists:foreach(fun(#resource{id = Id, config = Config} = _Resource) ->
case Id of
"webhook" ->
handle_config(Config, Version),
remove_resource(Id);
_ -> ok
end
end, emqx_rule_registry:get_resources()).
t_import422(_) ->
import("422.json", 422),
{ok, _} = emqx_mgmt_data_backup:export().
t_import423(_) ->
import("423.json", 423),
{ok, _} = emqx_mgmt_data_backup:export().
t_import425(_) ->
import("425.json", 425),
{ok, _} = emqx_mgmt_data_backup:export().
t_import430(_) ->
import("430.json", 430),
{ok, _} = emqx_mgmt_data_backup:export().
t_import409(_) ->
import("409.json", 409),
{ok, _} = emqx_mgmt_data_backup:export().
t_import415(_) ->
import("415.json", 415),
{ok, _} = emqx_mgmt_data_backup:export().

View File

@ -0,0 +1,43 @@
{
"version": "4.0",
"users": [
{
"username": "admin",
"tags": "administrator",
"password": "m/GtjNgri9GILklefVJH8BeTnNE="
}
],
"schemas": [],
"rules": [],
"resources": [
{
"type": "web_hook",
"id": "webhook",
"description": "webhook",
"created_at": null,
"config": {
"headers": {
"k": "v"
},
"method": "POST",
"url": "http://www.emqx.io"
}
}
],
"date": "2021-03-30 13:38:39",
"blacklist": [],
"auth_username": [],
"auth_mnesia": [],
"auth_clientid": [],
"apps": [
{
"status": true,
"secret": "public",
"name": "Default",
"id": "admin",
"expired": "undefined",
"desc": "Application user"
}
],
"acl_mnesia": []
}

View File

@ -0,0 +1,42 @@
{
"version": "4.1",
"users": [
{
"username": "admin",
"tags": "administrator",
"password": "3W+nHOkCZLFspENkIvHKzCbHxHI="
}
],
"schemas": [],
"rules": [],
"resources": [
{
"type": "web_hook",
"id": "webhook",
"description": "webhook",
"created_at": null,
"config": {
"url": "http://www.emqx.io",
"method": "POST",
"headers": {
"k": "v"
}
}
}
],
"date": "2021-03-30 13:52:53",
"blacklist": [],
"auth_username": [],
"auth_clientid": [],
"apps": [
{
"status": true,
"secret": "public",
"name": "Default",
"id": "admin",
"expired": "undefined",
"desc": "Application user"
}
],
"acl_mnesia": []
}

View File

@ -0,0 +1,43 @@
{
"version": "4.2",
"date": "2021-03-23 23:22:30",
"rules": [],
"resources": [
{
"id": "webhook",
"type": "web_hook",
"config": {
"headers": {
"k": "v"
},
"method": "POST",
"url": "http://www.emqx.io"
},
"created_at": null,
"description": "webhook"
}
],
"blacklist": [],
"apps": [
{
"id": "admin",
"secret": "public",
"name": "Default",
"desc": "Application user",
"status": true,
"expired": "undefined"
}
],
"users": [
{
"username": "admin",
"password": "mOTRLWt85F7GW+CSiBuRUZiRANw=",
"tags": "administrator"
}
],
"auth_clientid": [],
"auth_username": [],
"auth_mnesia": [],
"acl_mnesia": [],
"schemas": []
}

View File

@ -0,0 +1,44 @@
{
"version": "4.2",
"date": "2021-03-23 23:29:24",
"rules": [],
"resources": [
{
"id": "webhook",
"type": "web_hook",
"config": {
"headers": {
"k": "v"
},
"method": "POST",
"url": "http://www.emqx.io"
},
"content_type": "application/json",
"created_at": null,
"description": "webhook"
}
],
"blacklist": [],
"apps": [
{
"id": "admin",
"secret": "public",
"name": "Default",
"desc": "Application user",
"status": true,
"expired": "undefined"
}
],
"users": [
{
"username": "admin",
"password": "mOTRLWt85F7GW+CSiBuRUZiRANw=",
"tags": "administrator"
}
],
"auth_clientid": [],
"auth_username": [],
"auth_mnesia": [],
"acl_mnesia": [],
"schemas": []
}

View File

@ -0,0 +1,47 @@
{
"version": "4.2",
"date": "2021-03-24 14:51:22",
"rules": [],
"resources": [
{
"id": "webhook",
"type": "web_hook",
"config": {
"headers": {
"k": "v"
},
"method": "POST",
"url": "http://www.emqx.io"
},
"content_type": "application/json",
"connect_timeout" : 5,
"request_timeout" : 5,
"pool_size" : 8,
"created_at": null,
"description": "webhook"
}
],
"blacklist": [],
"apps": [
{
"id": "admin",
"secret": "public",
"name": "Default",
"desc": "Application user",
"status": true,
"expired": "undefined"
}
],
"users": [
{
"username": "admin",
"password": "mOTRLWt85F7GW+CSiBuRUZiRANw=",
"tags": "administrator"
}
],
"auth_clientid": [],
"auth_username": [],
"auth_mnesia": [],
"acl_mnesia": [],
"schemas": []
}

View File

@ -0,0 +1,52 @@
{
"version": "4.3",
"rules": [],
"resources": [
{
"id": "webhook",
"type": "web_hook",
"config": {
"cacertfile": {
"filename": "",
"file": ""
},
"certfile": {
"filename": "",
"file": ""
},
"keyfile": {
"filename": "",
"file": ""
},
"connect_timeout": "5s",
"pool_size": 8,
"request_timeout": "5s",
"url": "http://www.emqx.io",
"verify": false
},
"created_at": 1616581851001,
"description": "webhook"
}
],
"blacklist": [],
"apps": [
{
"id": "admin",
"secret": "public",
"name": "Default",
"desc": "Application user",
"status": true,
"expired": "undefined"
}
],
"users": [
{
"username": "admin",
"password": "q8v7hISIMz+iKn/ZuAaogvAxKbA=",
"tags": "administrator"
}
],
"auth_mnesia": [],
"acl_mnesia": [],
"date": "2021-03-24 18:31:21"
}

View File

@ -154,6 +154,22 @@
end
end()).
-define(CLUSTER_CALL(Func, Args), ?CLUSTER_CALL(Func, Args, ok)).
-define(CLUSTER_CALL(Func, Args, ResParttern),
fun() -> case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, Func, Args, 5000) of
{ResL, []} ->
case lists:filter(fun(ResParttern) -> false; (_) -> true end, ResL) of
[] -> ResL;
ErrL ->
?LOG(error, "cluster_call error found, ResL: ~p", [ResL]),
throw({Func, ErrL})
end;
{ResL, BadNodes} ->
?LOG(error, "cluster_call bad nodes found: ~p, ResL: ~p", [BadNodes, ResL]),
throw({Func, {failed_on_nodes, BadNodes}})
end end()).
%% Tables
-define(RULE_TAB, emqx_rule).
-define(ACTION_TAB, emqx_rule_action).

View File

@ -215,7 +215,7 @@ delete_rule(RuleId) ->
case emqx_rule_registry:get_rule(RuleId) of
{ok, Rule = #rule{actions = Actions}} ->
try
cluster_call(clear_rule, [Rule]),
_ = ?CLUSTER_CALL(clear_rule, [Rule]),
ok = emqx_rule_registry:remove_rule(Rule)
catch
Error:Reason:ST ->
@ -241,7 +241,7 @@ create_resource(#{type := Type, config := Config0} = Params) ->
ok = emqx_rule_registry:add_resource(Resource),
%% Note that we will return OK in case of resource creation failure,
%% A timer is started to re-start the resource later.
catch cluster_call(init_resource, [M, F, ResId, Config]),
catch _ = ?CLUSTER_CALL(init_resource, [M, F, ResId, Config]),
{ok, Resource};
not_found ->
{error, {resource_type_not_found, Type}}
@ -289,15 +289,14 @@ do_update_resource(#{id := Id, type := Type, description := NewDescription, conf
Config = emqx_rule_validator:validate_params(NewConfig, ParamSpec),
case test_resource(#{type => Type, config => NewConfig}) of
ok ->
Resource = #resource{
_ = ?CLUSTER_CALL(init_resource, [Module, Create, Id, Config]),
emqx_rule_registry:add_resource(#resource{
id = Id,
type = Type,
config = Config,
description = NewDescription,
created_at = erlang:system_time(millisecond)
},
cluster_call(init_resource, [Module, Create, Id, Config]),
emqx_rule_registry:add_resource(Resource);
});
{error, Reason} ->
error({error, Reason})
end
@ -328,8 +327,9 @@ test_resource(#{type := Type, config := Config0}) ->
Config = emqx_rule_validator:validate_params(Config0, ParamSpec),
ResId = resource_id(),
try
cluster_call(init_resource, [ModC, Create, ResId, Config]),
cluster_call(clear_resource, [ModD, Destroy, ResId])
_ = ?CLUSTER_CALL(init_resource, [ModC, Create, ResId, Config]),
_ = ?CLUSTER_CALL(clear_resource, [ModD, Destroy, ResId]),
ok
catch
throw:Reason -> {error, Reason}
end;
@ -366,7 +366,8 @@ delete_resource(ResId) ->
= emqx_rule_registry:find_resource_type(ResType),
try
ok = emqx_rule_registry:remove_resource(ResId),
cluster_call(clear_resource, [ModD, Destroy, ResId])
_ = ?CLUSTER_CALL(clear_resource, [ModD, Destroy, ResId]),
ok
catch
throw:Reason -> {error, Reason}
end;
@ -430,7 +431,13 @@ prepare_action(#{name := Name, args := Args0} = Action, NeedInit) ->
{ok, #action{module = Mod, on_create = Create, params_spec = ParamSpec}} ->
Args = emqx_rule_validator:validate_params(Args0, ParamSpec),
ActionInstId = maps:get(id, Action, action_instance_id(Name)),
NeedInit andalso cluster_call(init_action, [Mod, Create, ActionInstId, with_resource_params(Args)]),
case NeedInit of
true ->
_ = ?CLUSTER_CALL(init_action, [Mod, Create, ActionInstId,
with_resource_params(Args)]),
ok;
false -> ok
end,
#action_instance{
id = ActionInstId, name = Name, args = Args,
fallbacks = prepare_actions(maps:get(fallbacks, Action, []), NeedInit)
@ -481,7 +488,7 @@ may_update_rule_params(Rule, Params = #{on_action_failed := OnFailed}) ->
may_update_rule_params(Rule = #rule{actions = OldActions}, Params = #{actions := Actions}) ->
%% prepare new actions before removing old ones
NewActions = prepare_actions(Actions, maps:get(enabled, Params, true)),
cluster_call(clear_actions, [OldActions]),
_ = ?CLUSTER_CALL(clear_actions, [OldActions]),
may_update_rule_params(Rule#rule{actions = NewActions}, maps:remove(actions, Params));
may_update_rule_params(Rule, _Params) -> %% ignore all the unsupported params
Rule.
@ -512,20 +519,6 @@ gen_id(Prefix, TestFun) ->
action_instance_id(ActionName) ->
iolist_to_binary([atom_to_list(ActionName), "_", integer_to_list(erlang:system_time())]).
cluster_call(Func, Args) ->
case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, Func, Args, 5000) of
{ResL, []} ->
case lists:filter(fun(ok) -> false; (_) -> true end, ResL) of
[] -> ok;
ErrL ->
?LOG(error, "cluster_call error found, ResL: ~p", [ResL]),
throw({func_fail(Func), ErrL})
end;
{ResL, BadNodes} ->
?LOG(error, "cluster_call bad nodes found: ~p, ResL: ~p", [BadNodes, ResL]),
throw({func_fail(Func), {failed_on_nodes, BadNodes}})
end.
init_resource(Module, OnCreate, ResId, Config) ->
Params = ?RAISE(Module:OnCreate(ResId, Config),
{{Module, OnCreate}, {_EXCLASS_, _EXCPTION_, _ST_}}),
@ -642,15 +635,12 @@ refresh_actions(Actions, Pred) ->
true ->
{ok, #action{module = Mod, on_create = Create}}
= emqx_rule_registry:find_action(ActName),
cluster_call(init_action, [Mod, Create, Id, with_resource_params(Args)]),
_ = ?CLUSTER_CALL(init_action, [Mod, Create, Id, with_resource_params(Args)]),
refresh_actions(Fallbacks, Pred);
false -> ok
end
end, Actions).
func_fail(Func) when is_atom(Func) ->
list_to_atom(atom_to_list(Func) ++ "_failure").
find_type(ResId) ->
{ok, #resource{type = Type}} = emqx_rule_registry:find_resource(ResId),
{ok, Type}.

View File

@ -273,14 +273,13 @@ do_create_resource(Create, ParsedParams) ->
return({ok, record_to_map(Resource)});
{error, {resource_type_not_found, Type}} ->
return({error, 400, ?ERR_NO_RESOURCE_TYPE(Type)});
{error, {init_resource_failure, _}} ->
{error, {init_resource, _}} ->
return({error, 500, <<"Init resource failure!">>});
{error, Reason} ->
?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]),
return({error, 400, ?ERR_BADARGS(Reason)})
end.
list_resources(#{}, _Params) ->
Data0 = lists:foldr(fun maybe_record_to_map/2, [], emqx_rule_registry:get_resources()),
Data = lists:map(fun(Res = #{id := Id}) ->
@ -345,7 +344,7 @@ update_resource(#{id := Id}, NewParams) ->
{error, not_found} ->
?LOG(error, "Resource not found: ~0p", [Id]),
return({error, 400, <<"Resource not found:", Id/binary>>});
{error, {init_resource_failure, _}} ->
{error, {init_resource, _}} ->
?LOG(error, "Init resource failure: ~0p", [Id]),
return({error, 500, <<"Init resource failure:", Id/binary>>});
{error, {dependency_exists, RuleId}} ->

View File

@ -853,6 +853,7 @@ time_unit(<<"nanosecond">>) -> nanosecond.
%% Here the emqx_rule_funcs module acts as a proxy, forwarding
%% the function handling to the worker module.
%% @end
-ifdef(EMQX_ENTERPRISE).
'$handle_undefined_function'(schema_decode, [SchemaId, Data|MoreArgs]) ->
emqx_schema_parser:decode(SchemaId, Data, MoreArgs);
'$handle_undefined_function'(schema_decode, Args) ->
@ -868,6 +869,13 @@ time_unit(<<"nanosecond">>) -> nanosecond.
'$handle_undefined_function'(Fun, Args) ->
error({sql_function_not_supported, function_literal(Fun, Args)}).
-else.
'$handle_undefined_function'(sprintf, [Format|Args]) ->
erlang:apply(fun sprintf_s/2, [Format, Args]);
'$handle_undefined_function'(Fun, Args) ->
error({sql_function_not_supported, function_literal(Fun, Args)}).
-endif. % EMQX_ENTERPRISE
map_path(Key) ->
{path, [{key, P} || P <- string:split(Key, ".", all)]}.

View File

@ -67,6 +67,10 @@
, unregister_resource_types_of/1
]).
-export([ load_hooks_for_rule/1
, unload_hooks_for_rule/1
]).
%% for debug purposes
-export([dump/0]).
@ -216,8 +220,8 @@ remove_rules(Rules) ->
gen_server:call(?REGISTRY, {remove_rules, Rules}, ?T_CALL).
%% @private
insert_rule(Rule = #rule{for = Topics}) ->
lists:foreach(fun emqx_rule_events:load/1, Topics),
insert_rule(Rule) ->
_ = ?CLUSTER_CALL(load_hooks_for_rule, [Rule]),
mnesia:write(?RULE_TAB, Rule, write).
%% @private
@ -226,15 +230,21 @@ delete_rule(RuleId) when is_binary(RuleId) ->
{ok, Rule} -> delete_rule(Rule);
not_found -> ok
end;
delete_rule(Rule = #rule{id = Id, for = Topics}) ->
delete_rule(Rule) ->
_ = ?CLUSTER_CALL(unload_hooks_for_rule, [Rule]),
mnesia:delete_object(?RULE_TAB, Rule, write).
load_hooks_for_rule(#rule{for = Topics}) ->
lists:foreach(fun emqx_rule_events:load/1, Topics).
unload_hooks_for_rule(#rule{id = Id, for = Topics}) ->
lists:foreach(fun(Topic) ->
case get_rules_with_same_event(Topic) of
[#rule{id = Id}] -> %% we are now deleting the last rule
emqx_rule_events:unload(Topic);
_ -> ok
end
end, Topics),
mnesia:delete_object(?RULE_TAB, Rule, write).
end, Topics).
%%------------------------------------------------------------------------------
%% Action Management

View File

@ -269,10 +269,10 @@ message_type(Type) ->
io_lib:format("Unknown Type ~p", [Type]).
format(?SN_PUBLISH_MSG(Flags, TopicId, MsgId, Data)) ->
io_lib:format("mqtt_sn_message SN_PUBLISH, ~p, TopicId=~w, MsgId=~w, Payload=~w",
io_lib:format("mqtt_sn_message SN_PUBLISH, ~s, TopicId=~w, MsgId=~w, Payload=~w",
[format_flag(Flags), TopicId, MsgId, Data]);
format(?SN_PUBACK_MSG(Flags, MsgId, ReturnCode)) ->
io_lib:format("mqtt_sn_message SN_PUBACK, ~p, MsgId=~w, ReturnCode=~w",
io_lib:format("mqtt_sn_message SN_PUBACK, ~s, MsgId=~w, ReturnCode=~w",
[format_flag(Flags), MsgId, ReturnCode]);
format(?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId)) ->
io_lib:format("mqtt_sn_message SN_PUBCOMP, MsgId=~w", [MsgId]);
@ -281,13 +281,13 @@ format(?SN_PUBREC_MSG(?SN_PUBREC, MsgId)) ->
format(?SN_PUBREC_MSG(?SN_PUBREL, MsgId)) ->
io_lib:format("mqtt_sn_message SN_PUBREL, MsgId=~w", [MsgId]);
format(?SN_SUBSCRIBE_MSG(Flags, Msgid, Topic)) ->
io_lib:format("mqtt_sn_message SN_SUBSCRIBE, ~p, MsgId=~w, TopicId=~w",
io_lib:format("mqtt_sn_message SN_SUBSCRIBE, ~s, MsgId=~w, TopicId=~w",
[format_flag(Flags), Msgid, Topic]);
format(?SN_SUBACK_MSG(Flags, TopicId, MsgId, ReturnCode)) ->
io_lib:format("mqtt_sn_message SN_SUBACK, ~p, MsgId=~w, TopicId=~w, ReturnCode=~w",
io_lib:format("mqtt_sn_message SN_SUBACK, ~s, MsgId=~w, TopicId=~w, ReturnCode=~w",
[format_flag(Flags), MsgId, TopicId, ReturnCode]);
format(?SN_UNSUBSCRIBE_MSG(Flags, Msgid, Topic)) ->
io_lib:format("mqtt_sn_message SN_UNSUBSCRIBE, ~p, MsgId=~w, TopicId=~w",
io_lib:format("mqtt_sn_message SN_UNSUBSCRIBE, ~s, MsgId=~w, TopicId=~w",
[format_flag(Flags), Msgid, Topic]);
format(?SN_UNSUBACK_MSG(MsgId)) ->
io_lib:format("mqtt_sn_message SN_UNSUBACK, MsgId=~w", [MsgId]);

View File

@ -21,6 +21,9 @@
-include("emqx_sn.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/logger.hrl").
-logger_header("[MQTT-SN]").
%% API.
-export([start_link/3]).
@ -97,8 +100,6 @@
-define(STAT_TIMEOUT, 10000).
-define(IDLE_TIMEOUT, 30000).
-define(DEFAULT_CHAN_OPTIONS, [{max_packet_size, 256}, {zone, external}]).
-define(LOG(Level, Format, Args, State),
emqx_logger:Level("MQTT-SN(~s): " ++ Format, [esockd:format(State#state.peername) | Args])).
-define(NEG_QOS_CLIENT_ID, <<"NegQoS-Client">>).
@ -113,6 +114,12 @@
conn_mod => ?MODULE
}).
-define(is_non_error_reason(Reason),
Reason =:= normal;
Reason =:= idle_timeout;
Reason =:= asleep_timeout;
Reason =:= keepalive_timeout).
%%--------------------------------------------------------------------
%% Exported APIs
%%--------------------------------------------------------------------
@ -159,6 +166,7 @@ init([{_, SockPid, Sock}, Peername, Options]) ->
enable_qos3 = EnableQos3,
idle_timeout = IdleTimeout
},
emqx_logger:set_metadata_peername(esockd:format(Peername)),
{ok, idle, State, [IdleTimeout]};
{error, Reason} when Reason =:= enotconn;
Reason =:= einval;
@ -186,7 +194,7 @@ idle(cast, {incoming, ?SN_DISCONNECT_MSG(_Duration)}, State) ->
{keep_state, State, State#state.idle_timeout};
idle(cast, {incoming, ?SN_PUBLISH_MSG(_Flag, _TopicId, _MsgId, _Data)}, State = #state{enable_qos3 = false}) ->
?LOG(debug, "The enable_qos3 is false, ignore the received publish with QoS=-1 in idle mode!", [], State),
?LOG(debug, "The enable_qos3 is false, ignore the received publish with QoS=-1 in idle mode!"),
{keep_state, State#state.idle_timeout};
idle(cast, {incoming, ?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1,
@ -204,7 +212,7 @@ idle(cast, {incoming, ?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1,
false ->
ok
end,
?LOG(debug, "Client id=~p receives a publish with QoS=-1 in idle mode!", [ClientId], State),
?LOG(debug, "Client id=~p receives a publish with QoS=-1 in idle mode!", [ClientId]),
{keep_state, State#state.idle_timeout};
idle(cast, {incoming, PingReq = ?SN_PINGREQ_MSG(_ClientId)}, State) ->
@ -219,7 +227,7 @@ idle(cast, {connack, ConnAck}, State) ->
{next_state, connected, State};
idle(timeout, _Timeout, State) ->
stop({shutdown, idle_timeout}, State);
stop(idle_timeout, State);
idle(EventType, EventContent, State) ->
handle_event(EventType, EventContent, idle, State).
@ -253,8 +261,8 @@ wait_for_will_topic(cast, {connack, ConnAck}, State) ->
ok = handle_outgoing(ConnAck, State),
{next_state, connected, State};
wait_for_will_topic(cast, Event, State) ->
?LOG(error, "wait_for_will_topic UNEXPECTED Event: ~p", [Event], State),
wait_for_will_topic(cast, Event, _State) ->
?LOG(error, "wait_for_will_topic UNEXPECTED Event: ~p", [Event]),
keep_state_and_data;
wait_for_will_topic(EventType, EventContent, State) ->
@ -288,13 +296,13 @@ connected(cast, {incoming, ?SN_REGISTER_MSG(_TopicId, MsgId, TopicName)},
State = #state{clientid = ClientId, registry = Registry}) ->
case emqx_sn_registry:register_topic(Registry, ClientId, TopicName) of
TopicId when is_integer(TopicId) ->
?LOG(debug, "register ClientId=~p, TopicName=~p, TopicId=~p", [ClientId, TopicName, TopicId], State),
?LOG(debug, "register ClientId=~p, TopicName=~p, TopicId=~p", [ClientId, TopicName, TopicId]),
send_message(?SN_REGACK_MSG(TopicId, MsgId, ?SN_RC_ACCEPTED), State);
{error, too_large} ->
?LOG(error, "TopicId is full! ClientId=~p, TopicName=~p", [ClientId, TopicName], State),
?LOG(error, "TopicId is full! ClientId=~p, TopicName=~p", [ClientId, TopicName]),
send_message(?SN_REGACK_MSG(?SN_INVALID_TOPIC_ID, MsgId, ?SN_RC_NOT_SUPPORTED), State);
{error, wildcard_topic} ->
?LOG(error, "wildcard topic can not be registered! ClientId=~p, TopicName=~p", [ClientId, TopicName], State),
?LOG(error, "wildcard topic can not be registered! ClientId=~p, TopicName=~p", [ClientId, TopicName]),
send_message(?SN_REGACK_MSG(?SN_INVALID_TOPIC_ID, MsgId, ?SN_RC_NOT_SUPPORTED), State)
end,
{keep_state, State};
@ -305,7 +313,7 @@ connected(cast, {incoming, ?SN_PUBLISH_MSG(Flags, TopicId, MsgId, Data)},
Skip = (EnableQoS3 =:= false) andalso (QoS =:= ?QOS_NEG1),
case Skip of
true ->
?LOG(debug, "The enable_qos3 is false, ignore the received publish with QoS=-1 in connected mode!", [], State),
?LOG(debug, "The enable_qos3 is false, ignore the received publish with QoS=-1 in connected mode!"),
{keep_state, State};
false ->
do_publish(TopicIdType, TopicId, Data, Flags, MsgId, State)
@ -316,7 +324,7 @@ connected(cast, {incoming, ?SN_PUBACK_MSG(TopicId, MsgId, RC)}, State) ->
connected(cast, {incoming, ?SN_PUBREC_MSG(PubRec, MsgId)}, State)
when PubRec == ?SN_PUBREC; PubRec == ?SN_PUBREL; PubRec == ?SN_PUBCOMP ->
do_pubrec(PubRec, MsgId, State);
do_pubrec(PubRec, MsgId, connected, State);
connected(cast, {incoming, ?SN_SUBSCRIBE_MSG(Flags, MsgId, TopicId)}, State) ->
#mqtt_sn_flags{qos = QoS, topic_id_type = TopicIdType} = Flags,
@ -333,7 +341,7 @@ connected(cast, {incoming, ?SN_REGACK_MSG(_TopicId, _MsgId, ?SN_RC_ACCEPTED)}, S
{keep_state, State};
connected(cast, {incoming, ?SN_REGACK_MSG(TopicId, MsgId, ReturnCode)}, State) ->
?LOG(error, "client does not accept register TopicId=~p, MsgId=~p, ReturnCode=~p",
[TopicId, MsgId, ReturnCode], State),
[TopicId, MsgId, ReturnCode]),
{keep_state, State};
connected(cast, {incoming, ?SN_DISCONNECT_MSG(Duration)}, State) ->
@ -374,13 +382,13 @@ connected(cast, {connack, ConnAck}, State) ->
connected(cast, {shutdown, Reason, Packet}, State) ->
ok = handle_outgoing(Packet, State),
{stop, {shutdown, Reason}, State};
stop(Reason, State);
connected(cast, {shutdown, Reason}, State) ->
{stop, {shutdown, Reason}, State};
stop(Reason, State);
connected(cast, {close, Reason}, State) ->
?LOG(debug, "Force to close the socket due to ~p", [Reason], State),
?LOG(debug, "Force to close the socket due to ~p", [Reason]),
handle_info({sock_closed, Reason}, close_socket(State));
connected(EventType, EventContent, State) ->
@ -390,7 +398,7 @@ asleep(cast, {incoming, ?SN_DISCONNECT_MSG(Duration)}, State) ->
ok = send_message(?SN_DISCONNECT_MSG(undefined), State),
case Duration of
undefined ->
handle_incoming(?PACKET(?DISCONNECT), State);
handle_incoming(?DISCONNECT_PACKET(), State);
_Other ->
goto_asleep_state(Duration, State)
end;
@ -404,19 +412,19 @@ asleep(cast, {incoming, ?SN_PINGREQ_MSG(ClientIdPing)},
case ClientIdPing of
ClientId ->
inc_ping_counter(),
case emqx_session:dequeue(emqx_channel:get_session(Channel)) of
{ok, Session0} ->
case emqx_session:replay(emqx_channel:get_session(Channel)) of
{ok, [], Session0} ->
send_message(?SN_PINGRESP_MSG(), State),
{keep_state, State#state{
channel = emqx_channel:set_session(Session0, Channel)}};
{ok, Delivers, Session0} ->
Events = [emqx_message:to_packet(PckId, Msg) || {PckId, Msg} <- Delivers]
++ [try_goto_asleep],
{next_state, awake, State#state{
channel = emqx_channel:set_session(Session0, Channel),
has_pending_pingresp = true}, outgoing_events(Events)}
{ok, Publishes, Session0} ->
{Packets, Channel1} = emqx_channel:do_deliver(Publishes,
emqx_channel:set_session(Session0, Channel)),
{next_state, awake,
State#state{channel = Channel1, has_pending_pingresp = true},
outgoing_events(Packets ++ [try_goto_asleep])}
end;
_Other ->
_Other ->
{next_state, asleep, State}
end;
@ -425,7 +433,7 @@ asleep(cast, {incoming, ?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode)}, State) ->
asleep(cast, {incoming, ?SN_PUBREC_MSG(PubRec, MsgId)}, State)
when PubRec == ?SN_PUBREC; PubRec == ?SN_PUBREL; PubRec == ?SN_PUBCOMP ->
do_pubrec(PubRec, MsgId, State);
do_pubrec(PubRec, MsgId, asleep, State);
% NOTE: what about following scenario:
% 1) client go to sleep
@ -450,7 +458,7 @@ awake(cast, {incoming, ?SN_REGACK_MSG(_TopicId, _MsgId, ?SN_RC_ACCEPTED)}, State
awake(cast, {incoming, ?SN_REGACK_MSG(TopicId, MsgId, ReturnCode)}, State) ->
?LOG(error, "client does not accept register TopicId=~p, MsgId=~p, ReturnCode=~p",
[TopicId, MsgId, ReturnCode], State),
[TopicId, MsgId, ReturnCode]),
{keep_state, State};
awake(cast, {incoming, PingReq = ?SN_PINGREQ_MSG(_ClientId)}, State) ->
@ -463,15 +471,21 @@ awake(cast, {outgoing, Packet}, State) ->
awake(cast, {incoming, ?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode)}, State) ->
do_puback(TopicId, MsgId, ReturnCode, awake, State);
awake(cast, {incoming, ?SN_PUBREC_MSG(PubRec, MsgId)}, State)
when PubRec == ?SN_PUBREC; PubRec == ?SN_PUBREL; PubRec == ?SN_PUBCOMP ->
do_pubrec(PubRec, MsgId, awake, State);
awake(cast, try_goto_asleep, State=#state{channel = Channel,
has_pending_pingresp = PingPending}) ->
case emqx_mqueue:is_empty(emqx_session:info(mqueue, emqx_channel:get_session(Channel))) of
true when PingPending =:= true ->
Inflight = emqx_session:info(inflight, emqx_channel:get_session(Channel)),
case emqx_inflight:size(Inflight) of
0 when PingPending =:= true ->
send_message(?SN_PINGRESP_MSG(), State),
goto_asleep_state(State#state{has_pending_pingresp = false});
true when PingPending =:= false ->
0 when PingPending =:= false ->
goto_asleep_state(State);
false -> keep_state_and_data
_Size ->
keep_state_and_data
end;
awake(EventType, EventContent, State) ->
@ -494,26 +508,25 @@ handle_event({call, From}, Req, _StateName, State) ->
handle_event(info, {datagram, SockPid, Data}, StateName,
State = #state{sockpid = SockPid, channel = _Channel}) ->
?LOG(debug, "RECV ~p", [Data], State),
?LOG(debug, "RECV ~0p", [Data]),
Oct = iolist_size(Data),
inc_counter(recv_oct, Oct),
try emqx_sn_frame:parse(Data) of
{ok, Msg} ->
inc_counter(recv_cnt, 1),
?LOG(info, "RECV ~s at state ~s",
[emqx_sn_frame:format(Msg), StateName], State),
?LOG(info, "RECV ~s at state ~s", [emqx_sn_frame:format(Msg), StateName]),
{keep_state, State, next_event({incoming, Msg})}
catch
error:Error:Stacktrace ->
?LOG(info, "Parse frame error: ~p at state ~s, Stacktrace: ~p",
[Error, StateName, Stacktrace], State),
shutdown(frame_error, State)
[Error, StateName, Stacktrace]),
stop(frame_error, State)
end;
handle_event(info, {deliver, _Topic, Msg}, asleep,
State = #state{channel = Channel}) ->
% section 6.14, Support of sleeping clients
?LOG(debug, "enqueue downlink message in asleep state Msg=~p", [Msg], State),
?LOG(debug, "enqueue downlink message in asleep state Msg=~0p", [Msg]),
Session = emqx_session:enqueue(Msg, emqx_channel:get_session(Channel)),
{keep_state, State#state{channel = emqx_channel:set_session(Session, Channel)}};
@ -541,13 +554,16 @@ handle_event(info, {timeout, TRef, TMsg}, _StateName, State) ->
handle_timeout(TRef, TMsg, State);
handle_event(info, asleep_timeout, asleep, State) ->
?LOG(debug, "asleep timer timeout, shutdown now", [], State),
?LOG(debug, "asleep timer timeout, shutdown now"),
stop(asleep_timeout, State);
handle_event(info, asleep_timeout, StateName, State) ->
?LOG(debug, "asleep timer timeout on StateName=~p, ignore it", [StateName], State),
?LOG(debug, "asleep timer timeout on StateName=~p, ignore it", [StateName]),
{keep_state, State};
handle_event(cast, {close, Reason}, _StateName, State) ->
stop(Reason, State);
handle_event(cast, {event, connected}, _StateName, State = #state{channel = Channel}) ->
ClientId = emqx_channel:info(clientid, Channel),
emqx_cm:insert_channel_info(ClientId, info(State), stats(State)),
@ -566,8 +582,8 @@ handle_event(cast, {event, _Other}, _StateName, State = #state{channel = Channel
{keep_state, State};
handle_event(EventType, EventContent, StateName, State) ->
?LOG(error, "StateName: ~s, Unexpected Event: ~p",
[StateName, {EventType, EventContent}], State),
?LOG(error, "StateName: ~s, Unexpected Event: ~0p",
[StateName, {EventType, EventContent}]),
{keep_state, State}.
terminate(Reason, _StateName, #state{clientid = ClientId,
@ -597,7 +613,7 @@ handle_call(_From, Req, State = #state{channel = Channel}) ->
{reply, Reply, NChannel} ->
{reply, Reply, State#state{channel = NChannel}};
{shutdown, Reason, Reply, NChannel} ->
shutdown(Reason, Reply, State#state{channel = NChannel})
stop(Reason, Reply, State#state{channel = NChannel})
end.
handle_info(Info, State = #state{channel = Channel}) ->
@ -614,11 +630,11 @@ handle_return({ok, NChannel}, State, AddEvents) ->
handle_return({ok, Replies, NChannel}, State, AddEvents) ->
{keep_state, State#state{channel = NChannel}, outgoing_events(append(Replies, AddEvents))};
handle_return({shutdown, Reason, NChannel}, State, _AddEvents) ->
stop({shutdown, Reason}, State#state{channel = NChannel});
stop(Reason, State#state{channel = NChannel});
handle_return({shutdown, Reason, OutPacket, NChannel}, State, _AddEvents) ->
NState = State#state{channel = NChannel},
ok = handle_outgoing(OutPacket, NState),
stop({shutdown, Reason}, NState).
stop(Reason, NState).
outgoing_events(Actions) ->
lists:map(fun outgoing_event/1, Actions).
@ -750,8 +766,8 @@ send_connack(State) ->
send_message(?SN_CONNACK_MSG(?SN_RC_ACCEPTED), State).
send_message(Msg = #mqtt_sn_message{type = Type},
State = #state{sockpid = SockPid, peername = Peername}) ->
?LOG(debug, "SEND ~s~n", [emqx_sn_frame:format(Msg)], State),
#state{sockpid = SockPid, peername = Peername}) ->
?LOG(debug, "SEND ~s~n", [emqx_sn_frame:format(Msg)]),
inc_outgoing_stats(Type),
Data = emqx_sn_frame:serialize(Msg),
ok = emqx_metrics:inc('bytes.sent', iolist_size(Data)),
@ -761,35 +777,35 @@ send_message(Msg = #mqtt_sn_message{type = Type},
goto_asleep_state(State) ->
goto_asleep_state(undefined, State).
goto_asleep_state(Duration, State=#state{asleep_timer = AsleepTimer}) ->
?LOG(debug, "goto_asleep_state Duration=~p", [Duration], State),
?LOG(debug, "goto_asleep_state Duration=~p", [Duration]),
NewTimer = emqx_sn_asleep_timer:ensure(Duration, AsleepTimer),
{next_state, asleep, State#state{asleep_timer = NewTimer}, hibernate}.
%%--------------------------------------------------------------------
%% Helper funcs
%%--------------------------------------------------------------------
-compile({inline, [shutdown/2, shutdown/3]}).
shutdown(Reason, State) ->
?LOG(error, "shutdown due to ~p", [Reason], State),
stop({shutdown, Reason}, State).
shutdown(Reason, Reply, State) ->
?LOG(error, "shutdown due to ~p", [Reason], State),
stop({shutdown, Reason}, Reply, State).
-compile({inline, [stop/2, stop/3]}).
stop({shutdown, Reason}, State) ->
stop(Reason, State);
stop(Reason, State) ->
?LOG(stop_log_level(Reason), "stop due to ~p", [Reason]),
case Reason of
%% FIXME: The Will-Msg should publish when a Session terminated!
asleep_timeout -> do_publish_will(State);
{shutdown, keepalive_timeout} -> do_publish_will(State);
_ -> ok
asleep_timeout -> do_publish_will(State);
keepalive_timeout -> do_publish_will(State);
_ -> ok
end,
{stop, Reason, State}.
{stop, {shutdown, Reason}, State}.
stop({shutdown, Reason}, Reply, State) ->
stop(Reason, Reply, State);
stop(Reason, Reply, State) ->
{stop, Reason, Reply, State}.
?LOG(stop_log_level(Reason), "stop due to ~p", [Reason]),
{stop, {shutdown, Reason}, Reply, State}.
stop_log_level(Reason) when ?is_non_error_reason(Reason) ->
debug;
stop_log_level(_) ->
error.
mqttsn_to_mqtt(?SN_PUBACK, MsgId) ->
?PUBACK_PACKET(MsgId);
@ -801,6 +817,7 @@ mqttsn_to_mqtt(?SN_PUBCOMP, MsgId) ->
?PUBCOMP_PACKET(MsgId).
do_connect(ClientId, CleanStart, WillFlag, Duration, State) ->
emqx_logger:set_metadata_clientid(ClientId),
%% 6.6 Clients Publish Procedure
%% At any point in time a client may have only one QoS level 1 or 2 PUBLISH message
%% outstanding, i.e. it has to wait for the termination of this PUBLISH message exchange
@ -833,6 +850,7 @@ do_2nd_connect(Flags, Duration, ClientId, State = #state{sockname = Sockname,
clientid = OldClientId,
registry = Registry,
channel = Channel}) ->
emqx_logger:set_metadata_clientid(ClientId),
#mqtt_sn_flags{will = Will, clean_start = CleanStart} = Flags,
NChannel = case CleanStart of
true ->
@ -922,7 +940,7 @@ do_publish(?SN_NORMAL_TOPIC, TopicName, Data, Flags, MsgId, State) ->
do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId,
State=#state{clientid = ClientId, registry = Registry}) ->
#mqtt_sn_flags{qos = QoS, dup = Dup, retain = Retain} = Flags,
NewQoS = get_corrected_qos(QoS, State),
NewQoS = get_corrected_qos(QoS),
case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of
undefined ->
(NewQoS =/= ?QOS_0) andalso send_message(?SN_PUBACK_MSG(TopicId, MsgId, ?SN_RC_INVALID_TOPIC_ID), State),
@ -932,7 +950,7 @@ do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId,
end;
do_publish(?SN_SHORT_TOPIC, STopicName, Data, Flags, MsgId, State) ->
#mqtt_sn_flags{qos = QoS, dup = Dup, retain = Retain} = Flags,
NewQoS = get_corrected_qos(QoS, State),
NewQoS = get_corrected_qos(QoS),
<<TopicId:16>> = STopicName ,
case emqx_topic:wildcard(STopicName) of
true ->
@ -975,22 +993,22 @@ do_puback(TopicId, MsgId, ReturnCode, StateName,
{keep_state, State}
end;
_ ->
?LOG(error, "CAN NOT handle PUBACK ReturnCode=~p", [ReturnCode], State),
?LOG(error, "CAN NOT handle PUBACK ReturnCode=~p", [ReturnCode]),
{keep_state, State}
end.
do_pubrec(PubRec, MsgId, State) ->
handle_incoming(mqttsn_to_mqtt(PubRec, MsgId), State).
do_pubrec(PubRec, MsgId, StateName, State) ->
handle_incoming(mqttsn_to_mqtt(PubRec, MsgId), StateName, State).
proto_subscribe(TopicName, QoS, MsgId, TopicId, State) ->
?LOG(debug, "subscribe Topic=~p, MsgId=~p, TopicId=~p",
[TopicName, MsgId, TopicId], State),
[TopicName, MsgId, TopicId]),
enqueue_msgid(suback, MsgId, TopicId),
SubOpts = maps:put(qos, QoS, ?DEFAULT_SUBOPTS),
handle_incoming(?SUBSCRIBE_PACKET(MsgId, [{TopicName, SubOpts}]), State).
proto_unsubscribe(TopicName, MsgId, State) ->
?LOG(debug, "unsubscribe Topic=~p, MsgId=~p", [TopicName, MsgId], State),
?LOG(debug, "unsubscribe Topic=~p, MsgId=~p", [TopicName, MsgId]),
handle_incoming(?UNSUBSCRIBE_PACKET(MsgId, [TopicName]), State).
proto_publish(TopicName, Data, Dup, QoS, Retain, MsgId, TopicId, State) ->
@ -998,7 +1016,7 @@ proto_publish(TopicName, Data, Dup, QoS, Retain, MsgId, TopicId, State) ->
Publish = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, dup = Dup, qos = QoS, retain = Retain},
variable = #mqtt_packet_publish{topic_name = TopicName, packet_id = MsgId},
payload = Data},
?LOG(debug, "[publish] Msg: ~p~n", [Publish], State),
?LOG(debug, "[publish] Msg: ~0p~n", [Publish]),
handle_incoming(Publish, State).
update_will_topic(undefined, #mqtt_sn_flags{qos = QoS, retain = Retain}, Topic) ->
@ -1021,11 +1039,10 @@ dequeue_msgid(suback, MsgId) ->
dequeue_msgid(puback, MsgId) ->
erase({puback, MsgId}).
get_corrected_qos(?QOS_NEG1, State) ->
?LOG(debug, "Receive a publish with QoS=-1", [], State),
get_corrected_qos(?QOS_NEG1) ->
?LOG(debug, "Receive a publish with QoS=-1"),
?QOS_0;
get_corrected_qos(QoS, _State) ->
get_corrected_qos(QoS) ->
QoS.
get_topic_id(Type, MsgId) ->
@ -1037,17 +1054,18 @@ get_topic_id(Type, MsgId) ->
handle_incoming(Packet, State) ->
handle_incoming(Packet, unknown, State).
handle_incoming(?PUBACK_PACKET(_) = Packet, awake, State) ->
handle_incoming(#mqtt_packet{variable = #mqtt_packet_puback{}} = Packet, awake, State) ->
Result = channel_handle_in(Packet, State),
handle_return(Result, State, [try_goto_asleep]);
handle_incoming(Packet, _StName, State) ->
Result = channel_handle_in(Packet, State),
handle_return(Result, State).
channel_handle_in(Packet = ?PACKET(Type), State = #state{channel = Channel}) ->
channel_handle_in(Packet = ?PACKET(Type), #state{channel = Channel}) ->
_ = inc_incoming_stats(Type),
ok = emqx_metrics:inc_recv(Packet),
?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)], State),
?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]),
emqx_channel:handle_in(Packet, Channel).
handle_outgoing(Packets, State) when is_list(Packets) ->
@ -1057,7 +1075,7 @@ handle_outgoing(PubPkt = ?PUBLISH_PACKET(QoS, TopicName, PacketId, Payload),
State = #state{clientid = ClientId, registry = Registry}) ->
#mqtt_packet{header = #mqtt_packet_header{dup = Dup, retain = Retain}} = PubPkt,
MsgId = message_id(PacketId),
?LOG(debug, "Handle outgoing: ~p", [PubPkt], State),
?LOG(debug, "Handle outgoing: ~0p", [PubPkt]),
(emqx_sn_registry:lookup_topic_id(Registry, ClientId, TopicName) == undefined)
andalso (byte_size(TopicName) =/= 2)
@ -1072,8 +1090,8 @@ handle_outgoing(Packet, State) ->
register_and_notify_client(TopicName, Payload, Dup, QoS, Retain, MsgId, ClientId,
State = #state{registry = Registry}) ->
TopicId = emqx_sn_registry:register_topic(Registry, ClientId, TopicName),
?LOG(debug, "register TopicId=~p, TopicName=~p, Payload=~p, Dup=~p, QoS=~p, Retain=~p, MsgId=~p",
[TopicId, TopicName, Payload, Dup, QoS, Retain, MsgId], State),
?LOG(debug, "Register TopicId=~p, TopicName=~p, Payload=~p, Dup=~p, QoS=~p, "
"Retain=~p, MsgId=~p", [TopicId, TopicName, Payload, Dup, QoS, Retain, MsgId]),
send_register(TopicName, TopicId, MsgId, State).
message_id(undefined) ->

View File

@ -1251,7 +1251,10 @@ t_asleep_test06_to_awake_qos2_dl_msg(_) ->
UdpData = wrap_receive_response(Socket),
MsgId_udp = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicId_tom, Payload1}, UdpData),
send_pubrec_msg(Socket, MsgId_udp),
?assertMatch(<<_:8, ?SN_PUBREL:8, _/binary>>, receive_response(Socket)),
send_pubcomp_msg(Socket, MsgId_udp),
%% verify the pingresp is received after receiving all the buffered qos2 msgs
?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
gen_udp:close(Socket).
@ -1483,6 +1486,10 @@ t_awake_test02_to_disconnected(_) ->
timer:sleep(100),
% goto awake state
send_pingreq_msg(Socket, <<"test">>),
?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% send disconnect message, and goto disconnected state
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

View File

@ -44,16 +44,21 @@ groups() ->
{ipv6https, [sequence], Cases}].
init_per_group(Name, Config) ->
application:ensure_all_started(emqx_management),
set_special_cfgs(),
case Name of
http ->
emqx_ct_helpers:start_apps([emqx_web_hook], fun set_special_configs_http/1);
emqx_ct_helpers:start_apps([emqx_web_hook, emqx_modules, emqx_management,
emqx_rule_engine], fun set_special_configs_http/1);
https ->
emqx_ct_helpers:start_apps([emqx_web_hook], fun set_special_configs_https/1);
emqx_ct_helpers:start_apps([emqx_web_hook, emqx_modules, emqx_management,
emqx_rule_engine], fun set_special_configs_https/1);
ipv6http ->
emqx_ct_helpers:start_apps([emqx_web_hook], fun set_special_configs_ipv6_http/1);
emqx_ct_helpers:start_apps([emqx_web_hook, emqx_management,
emqx_rule_engine], fun set_special_configs_ipv6_http/1);
ipv6https ->
emqx_ct_helpers:start_apps([emqx_web_hook], fun set_special_configs_ipv6_https/1)
emqx_ct_helpers:start_apps([emqx_web_hook, emqx_management,
emqx_rule_engine], fun set_special_configs_ipv6_https/1)
end,
Config.

View File

@ -325,9 +325,6 @@ peer2addr({Host, _}) ->
peer2addr(Host) ->
list_to_binary(inet:ntoa(Host)).
ensure_to_binary(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8);
ensure_to_binary(Bin) when is_binary(Bin) -> Bin.
stringfy({shutdown, Reason}) ->
stringfy(Reason);
stringfy(Term) when is_atom(Term); is_binary(Term) ->
@ -343,17 +340,6 @@ receive_http_request_body() ->
exit(waiting_message_timeout)
end.
receive_http_request_bodys() ->
receive_http_request_bodys_([]).
receive_http_request_bodys_(Acc) ->
receive
{post, _, _, Body} ->
receive_http_request_bodys_([Body|Acc])
after 1000 ->
lists:reverse(Acc)
end.
filter_topictab(TopicTab, {undefined}) ->
TopicTab;
filter_topictab(TopicTab, {TopicFilter}) ->
@ -399,9 +385,6 @@ topic_filter_env() ->
payload_encode() ->
oneof([base62, base64, undefined]).
http_code() ->
oneof([socket_closed_remotely, others]).
disconnected_conninfo() ->
?LET(Info, conninfo(),
begin

View File

@ -213,8 +213,20 @@ generate_config() {
if [ "${EMQX_LICENSE_CONF:-}" != "" ]; then
EMQX_LICENSE_CONF_OPTION="-i ${EMQX_LICENSE_CONF}"
fi
set +e
# shellcheck disable=SC2086
CONFIG_ARGS=$("$ERTS_PATH"/escript "$RUNNER_ROOT_DIR"/bin/cuttlefish -i "$REL_DIR"/emqx.schema $EMQX_LICENSE_CONF_OPTION -c "$RUNNER_ETC_DIR"/emqx.conf -d "$RUNNER_DATA_DIR"/configs generate)
CUTTLEFISH_OUTPUT="$("$ERTS_PATH"/escript "$RUNNER_ROOT_DIR"/bin/cuttlefish -v -i "$REL_DIR"/emqx.schema $EMQX_LICENSE_CONF_OPTION -c "$RUNNER_ETC_DIR"/emqx.conf -d "$RUNNER_DATA_DIR"/configs generate)"
# shellcheck disable=SC2181
RESULT=$?
set -e
if [ $RESULT -gt 0 ]; then
echo "$CUTTLEFISH_OUTPUT"
exit $RESULT
fi
# print override from environment variables (EMQX_*)
echo "$CUTTLEFISH_OUTPUT" | sed -e '$d'
CONFIG_ARGS=$(echo "$CUTTLEFISH_OUTPUT" | tail -n 1)
## Merge cuttlefish generated *.args into the vm.args
CUTTLE_GEN_ARG_FILE=$(echo "$CONFIG_ARGS" | sed -n 's/^.*\(vm_args[[:space:]]\)//p' | awk '{print $1}')

View File

@ -876,6 +876,30 @@ zone.external.enable_flapping_detect = off
## Example: 100KB incoming per 10 seconds.
#zone.external.rate_limit.conn_bytes_in = "100KB,10s"
## Whether to alarm the congested connections.
##
## Sometimes the mqtt connection (usually an MQTT subscriber) may get "congested" because
## there're too many packets to sent. The socket trys to buffer the packets until the buffer is
## full. If more packets comes after that, the packets will be "pending" in a queue
## and we consider the connection is "congested".
##
## Enable this to send an alarm when there's any bytes pending in the queue. You could set
## the `listener.tcp.<ZoneName>.sndbuf` to a larger value if the alarm is triggered too often.
##
## The name of the alarm is of format "conn_congestion/<ClientID>/<Username>".
## Where the <ClientID> is the client-id of the congested MQTT connection.
## And the <Username> is the username or "unknown_user" of not provided by the client.
## Default: off
#zone.external.conn_congestion.alarm = off
## Won't clear the congested alarm in how long time.
## The alarm is cleared only when there're no pending bytes in the queue, and also it has been
## `min_alarm_sustain_duration` time since the last time we considered the connection is "congested".
##
## This is to avoid clearing and sending the alarm again too often.
## Default: 1m
#zone.external.conn_congestion.min_alarm_sustain_duration = 1m
## Messages quota for the each of external MQTT connection.
## This value consumed by the number of recipient on a message.
##

View File

@ -3,7 +3,7 @@
{vsn, "4.3.0"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_telemetry_sup]},
{applications, [kernel,stdlib,emqx_modules]},
{applications, [kernel,stdlib]},
{mod, {emqx_telemetry_app,[]}},
{env, []},
{licenses, ["Apache-2.0"]},

View File

@ -19,6 +19,15 @@ e.g. For an Erlang plugin named `plugin_foo`:
}.
```
Note: The `-emqx_plugin(?MODULE)` attribute should be added to
`<plugin-name>_app.erl` file to indicate that this is an EMQ X Broker plugin.
For example:
```erlang
%% plugin_foo_app.erl
-emqx_plugin(?MODULE)
```
## Build a release
```

View File

@ -1015,6 +1015,16 @@ end}.
{datatype, string}
]}.
{mapping, "zone.$name.conn_congestion.alarm", "emqx.zones", [
{datatype, flag},
{default, off}
]}.
{mapping, "zone.$name.conn_congestion.min_alarm_sustain_duration", "emqx.zones", [
{default, "1m"},
{datatype, {duration, ms}}
]}.
{mapping, "zone.$name.quota.conn_messages_routing", "emqx.zones", [
{datatype, string}
]}.
@ -1144,6 +1154,10 @@ end}.
{ratelimit, {conn_messages_in, Ratelimit(Val)}};
(["rate_limit", "conn_bytes_in"], Val) ->
{ratelimit, {conn_bytes_in, Ratelimit(Val)}};
(["conn_congestion", "alarm"], Val) ->
{conn_congestion_alarm_enabled, Val};
(["conn_congestion", "min_alarm_sustain_duration"], Val) ->
{conn_congestion_min_alarm_sustain_duration, Val};
(["quota", "conn_messages_routing"], Val) ->
{quota, {conn_messages_routing, Ratelimit(Val)}};
(["quota", "overall_messages_routing"], Val) ->

View File

@ -42,7 +42,7 @@
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.0"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.8.0"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.0"}}}
, {cuttlefish, {git, "https://github.com/z8674558/cuttlefish", {branch, "fix-eunit"}}}
, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {branch, "hocon"}}}
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.5"}}}
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.0"}}}
, {replayq, {git, "https://github.com/emqx/replayq", {tag, "0.3.2"}}}
@ -52,7 +52,7 @@
, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}
, {observer_cli, "1.6.1"} % NOTE: depends on recon 2.5.1
, {getopt, "1.0.1"}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.8.2"}}}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.10.0"}}}
]}.
{xref_ignores,

View File

@ -44,9 +44,7 @@ filter_extra_deps([{Plugin, _} = P | More], Filter, Acc) ->
overrides() ->
[ {add, [ {extra_src_dirs, [{"etc", [{recursive,true}]}]}
, {erl_opts, [ deterministic
, {compile_info, [{emqx_vsn, get_vsn()}]}
]}
, {erl_opts, [{compile_info, [{emqx_vsn, get_vsn()}]}]}
]}
] ++ community_plugin_overrides().
@ -96,7 +94,7 @@ plugins(HasElixir) ->
test_plugins() ->
[ rebar3_proper,
{coveralls, {git, "https://github.com/emqx/coveralls-erl", {branch, "github"}}}
{coveralls, {git, "https://github.com/emqx/coveralls-erl", {branch, "fix-git-info"}}}
].
test_deps() ->
@ -107,30 +105,37 @@ test_deps() ->
common_compile_opts() ->
[ debug_info % alwyas include debug_info
, deterministic
, {compile_info, [{emqx_vsn, get_vsn()}]}
| [{d, 'EMQX_ENTERPRISE'} || is_enterprise()]
].
prod_compile_opts() ->
[ compressed
, deterministic
, warnings_as_errors
| common_compile_opts()
].
prod_overrides() ->
[{add, [ {erl_opts, [deterministic]}]}].
profiles() ->
Vsn = get_vsn(),
[ {'emqx', [ {erl_opts, prod_compile_opts()}
, {relx, relx(Vsn, cloud, bin)}
, {overrides, prod_overrides()}
]}
, {'emqx-pkg', [ {erl_opts, prod_compile_opts()}
, {relx, relx(Vsn, cloud, pkg)}
, {overrides, prod_overrides()}
]}
, {'emqx-edge', [ {erl_opts, prod_compile_opts()}
, {relx, relx(Vsn, edge, bin)}
, {overrides, prod_overrides()}
]}
, {'emqx-edge-pkg', [ {erl_opts, prod_compile_opts()}
, {relx, relx(Vsn, edge, pkg)}
, {overrides, prod_overrides()}
]}
, {check, [ {erl_opts, common_compile_opts()}
]}
@ -239,13 +244,20 @@ relx_apps(ReleaseType) ->
++ [{N, load} || N <- relx_plugin_apps(ReleaseType)].
relx_apps_per_rel(cloud) ->
[ {observer, load}
, luerl
[ luerl
, xmerl
| [{observer, load} || is_app(observer)]
];
relx_apps_per_rel(edge) ->
[].
is_app(Name) ->
case application:load(Name) of
ok -> true;
{error,{already_loaded, _}} -> true;
_ -> false
end.
relx_plugin_apps(ReleaseType) ->
[ emqx_retainer
, emqx_management
@ -444,7 +456,6 @@ coveralls() ->
Cfgs = [{coveralls_repo_token, Token},
{coveralls_service_job_id, os:getenv("GITHUB_RUN_ID")},
{coveralls_commit_sha, os:getenv("GITHUB_SHA")},
{coveralls_service_number, os:getenv("GITHUB_RUN_NUMBER")},
{coveralls_coverdata, "_build/test/cover/*.coverdata"},
{coveralls_service_name, "github"}],
case os:getenv("GITHUB_EVENT_NAME") =:= "pull_request"

View File

@ -234,10 +234,20 @@ shutdown(Reason) ->
?LOG(critical, "emqx shutdown for ~s", [Reason]),
_ = emqx_alarm_handler:unload(),
_ = emqx_plugins:unload(),
lists:foreach(fun application:stop/1, [emqx, ekka, cowboy, ranch, esockd, gproc]).
lists:foreach(fun application:stop/1
, lists:reverse(default_started_applications())
).
reboot() ->
lists:foreach(fun application:start/1, [gproc, esockd, ranch, cowboy, ekka, emqx]).
lists:foreach(fun application:start/1 , default_started_applications()).
-ifdef(EMQX_ENTERPRISE).
default_started_applications() ->
[gproc, esockd, ranch, cowboy, ekka, emqx].
-else.
default_started_applications() ->
[gproc, esockd, ranch, cowboy, ekka, emqx, emqx_modules].
-endif.
%%--------------------------------------------------------------------
%% Internal functions

View File

@ -379,7 +379,7 @@ normalize_message(partition, #{occurred := Node}) ->
list_to_binary(io_lib:format("Partition occurs at node ~s", [Node]));
normalize_message(<<"resource", _/binary>>, #{type := Type, id := ID}) ->
list_to_binary(io_lib:format("Resource ~s(~s) is down", [Type, ID]));
normalize_message(<<"mqtt_conn/congested/", Info/binary>>, _) ->
list_to_binary(io_lib:format("MQTT connection congested: ~s", [Info]));
normalize_message(<<"conn_congestion/", Info/binary>>, _) ->
list_to_binary(io_lib:format("connection congested: ~s", [Info]));
normalize_message(_Name, _UnknownDetails) ->
<<"Unknown alarm">>.

View File

@ -74,8 +74,13 @@ print_otp_version_warning() ->
print_banner() ->
io:format("Starting ~s on node ~s~n", [?APP, node()]).
-ifndef(TEST).
print_vsn() ->
io:format("~s ~s is running now!~n", [get_description(), get_release()]).
-else.
print_vsn() ->
ok.
-endif.
get_description() ->
{ok, Descr0} = application:get_key(?APP, description),

View File

@ -48,6 +48,9 @@
, terminate/2
]).
%% Export for emqx_sn
-export([do_deliver/2]).
%% Exports for CT
-export([set_field/3]).

View File

@ -16,22 +16,16 @@
-module(emqx_congestion).
-export([ maybe_alarm_port_busy/3
, maybe_alarm_port_busy/4
, maybe_alarm_too_many_publish/5
, maybe_alarm_too_many_publish/6
-export([ maybe_alarm_conn_congestion/3
, cancel_alarms/3
]).
-elvis([{elvis_style, invalid_dynamic_call, #{ignore => [emqx_congestion]}}]).
-define(ALARM_CONN_CONGEST(Channel, Reason),
list_to_binary(
io_lib:format("mqtt_conn/congested/~s/~s/~s",
[emqx_channel:info(clientid, Channel),
io_lib:format("~s/~s/~s",
[Reason, emqx_channel:info(clientid, Channel),
maps:get(username, emqx_channel:info(clientinfo, Channel),
<<"undefined">>),
Reason]))).
<<"unknown_user">>)]))).
-define(ALARM_CONN_INFO_KEYS, [socktype, sockname, peername, clientid, username,
proto_name, proto_ver, connected_at, conn_state]).
@ -39,44 +33,28 @@
-define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]).
-define(PROC_INFO_KEYS, [message_queue_len, memory, reductions]).
-define(ALARM_SENT(REASON), {alarm_sent, REASON}).
-define(ALL_ALARM_REASONS, [port_busy, too_many_publish]).
-define(CONFIRM_CLEAR(REASON), {alarm_confirm_clear, REASON}).
-define(CONFIRM_CLEAR_INTERVAL, 10000).
-define(ALL_ALARM_REASONS, [conn_congestion]).
-define(WONT_CLEAR_IN, 60000).
maybe_alarm_port_busy(Socket, Transport, Channel) ->
maybe_alarm_port_busy(Socket, Transport, Channel, false).
maybe_alarm_port_busy(Socket, Transport, Channel, ForceClear) ->
case is_tcp_congested(Socket, Transport) of
true -> alarm_congestion(Socket, Transport, Channel, port_busy);
false -> cancel_alarm_congestion(Socket, Transport, Channel, port_busy,
ForceClear)
maybe_alarm_conn_congestion(Socket, Transport, Channel) ->
case is_alarm_enabled(Channel) of
false -> ok;
true ->
case is_tcp_congested(Socket, Transport) of
true -> alarm_congestion(Socket, Transport, Channel, conn_congestion);
false -> cancel_alarm_congestion(Socket, Transport, Channel, conn_congestion)
end
end.
maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
MaxBatchSize) ->
maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
MaxBatchSize, false).
maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
PubMsgCount = _MaxBatchSize, _ForceClear) ->
%% we only alarm it when the process is "too busy"
alarm_congestion(Socket, Transport, Channel, too_many_publish);
maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
_MaxBatchSize, ForceClear) when PubMsgCount == 0 ->
%% but we clear the alarm until it is really "idle", to avoid sending
%% alarms and clears too frequently
cancel_alarm_congestion(Socket, Transport, Channel, too_many_publish,
ForceClear);
maybe_alarm_too_many_publish(_Socket, _Transport, _Channel, _PubMsgCount,
_MaxBatchSize, _ForceClear) ->
ok.
cancel_alarms(Socket, Transport, Channel) ->
lists:foreach(fun(Reason) ->
do_cancel_alarm_congestion(Socket, Transport, Channel, Reason)
end, ?ALL_ALARM_REASONS).
is_alarm_enabled(Channel) ->
emqx_zone:get_env(emqx_channel:info(zone, Channel),
conn_congestion_alarm_enabled, false).
alarm_congestion(Socket, Transport, Channel, Reason) ->
case has_alarm_sent(Reason) of
false -> do_alarm_congestion(Socket, Transport, Channel, Reason);
@ -85,8 +63,11 @@ alarm_congestion(Socket, Transport, Channel, Reason) ->
update_alarm_sent_at(Reason)
end.
cancel_alarm_congestion(Socket, Transport, Channel, Reason, ForceClear) ->
case is_alarm_allowed_clear(Reason, ForceClear) of
cancel_alarm_congestion(Socket, Transport, Channel, Reason) ->
Zone = emqx_channel:info(zone, Channel),
WontClearIn = emqx_zone:get_env(Zone, conn_congestion_min_alarm_sustain_duration,
?WONT_CLEAR_IN),
case has_alarm_sent(Reason) andalso long_time_since_last_alarm(Reason, WontClearIn) of
true -> do_cancel_alarm_congestion(Socket, Transport, Channel, Reason);
false -> ok
end.
@ -125,14 +106,11 @@ get_alarm_sent_at(Reason) ->
undefined -> 0;
LastSentAt -> LastSentAt
end.
is_alarm_allowed_clear(Reason, _ForceClear = true) ->
has_alarm_sent(Reason);
is_alarm_allowed_clear(Reason, _ForceClear = false) ->
long_time_since_last_alarm(Reason, WontClearIn) ->
%% only sent clears when the alarm was not triggered in the last
%% ?CONFIRM_CLEAR_INTERVAL time
%% WontClearIn time
case timenow() - get_alarm_sent_at(Reason) of
Elapse when Elapse >= ?CONFIRM_CLEAR_INTERVAL -> true;
Elapse when Elapse >= WontClearIn -> true;
_ -> false
end.

View File

@ -390,12 +390,8 @@ handle_msg({Passive, _Sock}, State)
handle_info(activate_socket, NState1);
handle_msg(Deliver = {deliver, _Topic, _Msg},
#state{active_n = MaxBatchSize, transport = Transport,
socket = Socket, channel = Channel} = State) ->
Delivers0 = emqx_misc:drain_deliver(MaxBatchSize),
emqx_congestion:maybe_alarm_too_many_publish(Socket, Transport, Channel,
length(Delivers0), MaxBatchSize),
Delivers = [Deliver|Delivers0],
#state{active_n = ActiveN} = State) ->
Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)],
with_channel(handle_deliver, [Delivers], State);
%% Something sent
@ -509,12 +505,9 @@ handle_timeout(_TRef, limit_timeout, State) ->
},
handle_info(activate_socket, NState);
handle_timeout(_TRef, emit_stats, State = #state{active_n = MaxBatchSize,
channel = Channel, transport = Transport, socket = Socket}) ->
{_, MsgQLen} = erlang:process_info(self(), message_queue_len),
emqx_congestion:maybe_alarm_port_busy(Socket, Transport, Channel, true),
emqx_congestion:maybe_alarm_too_many_publish(Socket, Transport, Channel,
MsgQLen, MaxBatchSize, true),
handle_timeout(_TRef, emit_stats, State = #state{channel = Channel, transport = Transport,
socket = Socket}) ->
emqx_congestion:maybe_alarm_conn_congestion(Socket, Transport, Channel),
ClientId = emqx_channel:info(clientid, Channel),
emqx_cm:set_chan_stats(ClientId, stats(State)),
{ok, State#state{stats_timer = undefined}};
@ -627,7 +620,7 @@ send(IoData, #state{transport = Transport, socket = Socket, channel = Channel})
Oct = iolist_size(IoData),
ok = emqx_metrics:inc('bytes.sent', Oct),
inc_counter(outgoing_bytes, Oct),
emqx_congestion:maybe_alarm_port_busy(Socket, Transport, Channel),
emqx_congestion:maybe_alarm_conn_congestion(Socket, Transport, Channel),
case Transport:async_send(Socket, IoData, [nosuspend]) of
ok -> ok;
Error = {error, _Reason} ->

View File

@ -106,14 +106,21 @@ format_listen_on(ListenOn) -> format(ListenOn).
start_listener(#{proto := Proto, name := Name, listen_on := ListenOn, opts := Options}) ->
ID = identifier(Proto, Name),
case start_listener(Proto, ListenOn, Options) of
{ok, _} -> io:format("Start ~s listener on ~s successfully.~n",
[ID, format(ListenOn)]);
{ok, _} ->
console_print("Start ~s listener on ~s successfully.~n", [ID, format(ListenOn)]);
{error, Reason} ->
io:format(standard_error, "Failed to start mqtt listener ~s on ~s: ~0p~n",
[ID, format(ListenOn), Reason]),
error(Reason)
end.
-ifndef(TEST).
console_print(Fmt, Args) ->
io:format(Fmt, Args).
-else.
console_print(_Fmt, _Args) -> ok.
-endif.
%% Start MQTT/TCP listener
-spec(start_listener(esockd:proto(), esockd:listen_on(), [esockd:option()])
-> {ok, pid()} | {error, term()}).

View File

@ -232,7 +232,8 @@ generate_configs(App) ->
{_, true} ->
Schema = cuttlefish_schema:files([SchemaFile]),
Conf = cuttlefish_conf:file(ConfFile),
cuttlefish_generator:map(Schema, Conf);
LogFun = fun(Key, Value) -> ?LOG(info, "~s = ~p", [string:join(Key, "."), Value]) end,
cuttlefish_generator:map(Schema, Conf, undefined, LogFun);
{false, false} ->
error({config_not_found, {ConfigFile, ConfFile, SchemaFile}})
end.

View File

@ -414,8 +414,8 @@ t_oom_shutdown(_) ->
with_conn(
fun(Pid) ->
Pid ! {tcp_passive, foo},
?block_until(#{?snk_kind := check_oom}, 100),
?block_until(#{?snk_kind := terminate}, 10),
{ok, _} = ?block_until(#{?snk_kind := check_oom}, 1000),
{ok, _} = ?block_until(#{?snk_kind := terminate}, 100),
Trace = snabbkaffe:collect_trace(),
?assertEqual(1, length(?of_kind(terminate, Trace))),
receive

View File

@ -113,7 +113,7 @@ validate_sys_mon_info(PidOrPort, SysMonName,ValidateInfo, InfoOrPort) ->
ct:pal("OK - received msg: ~p~n", [Info])
after
1000 ->
ct:fail("flase")
ct:fail(timeout)
end,
emqtt:stop(C).

View File

@ -86,8 +86,7 @@ init_per_testcase(TestCase, Config) when
init_per_testcase(_, Config) ->
Config.
end_per_testcase(TestCase, Config) when
end_per_testcase(TestCase, _Config) when
TestCase =/= t_ws_sub_protocols_mqtt_equivalents,
TestCase =/= t_ws_sub_protocols_mqtt,
TestCase =/= t_ws_check_origin,