Merge pull request #4286 from z8674558/fix-merge-conflict-master-to-5.0

Fix merge conflict master to 5.0
This commit is contained in:
Yudai Kiyofuji 2021-03-04 22:40:36 +09:00 committed by GitHub
commit aa3c650cdd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
88 changed files with 1369 additions and 1230 deletions

12
.ci/apps_tests/conf.env Normal file
View File

@ -0,0 +1,12 @@
EMQX_AUTH__LDAP__SERVERS=ldap_server
EMQX_AUTH__MONGO__SERVER=mongo_server:27017
EMQX_AUTH__REDIS__SERVER=redis_server:6379
EMQX_AUTH__MYSQL__SERVER=mysql_server:3306
EMQX_AUTH__MYSQL__USERNAME=root
EMQX_AUTH__MYSQL__PASSWORD=public
EMQX_AUTH__MYSQL__DATABASE=mqtt
EMQX_AUTH__PGSQL__SERVER=pgsql_server:5432
EMQX_AUTH__PGSQL__USERNAME=root
EMQX_AUTH__PGSQL__PASSWORD=public
EMQX_AUTH__PGSQL__DATABASE=mqtt
CUTTLEFISH_ENV_OVERRIDE_PREFIX=EMQX_

View File

@ -12,6 +12,8 @@ services:
- ldap_server
networks:
- emqx_bridge
env_file:
- conf.env
environment:
GITHUB_ACTIONS: ${GITHUB_ACTIONS}
GITHUB_TOKEN: ${GITHUB_TOKEN}

View File

@ -1,62 +0,0 @@
#!/usr/bin/env bash
#
# Author: Stefan Buck
# License: MIT
# https://gist.github.com/stefanbuck/ce788fee19ab6eb0b4447a85fc99f447
#
#
# This script accepts the following parameters:
#
# * owner
# * repo
# * tag
# * filename
# * github_api_token
#
# Script to upload a release asset using the GitHub API v3.
#
# Example:
#
# upload-github-release-asset.sh github_api_token=TOKEN owner=stefanbuck repo=playground tag=v0.1.0 filename=./build.zip
#
# Check dependencies.
set -e
xargs=$(which gxargs || which xargs)
# Validate settings.
[ "$TRACE" ] && set -x
CONFIG=$@
for line in $CONFIG; do
eval "$line"
done
# Define variables.
GH_API="https://api.github.com"
GH_REPO="$GH_API/repos/$owner/$repo"
GH_TAGS="$GH_REPO/releases/tags/$tag"
AUTH="Authorization: token $github_api_token"
WGET_ARGS="--content-disposition --auth-no-challenge --no-cookie"
CURL_ARGS="-LJO#"
if [[ "$tag" == 'LATEST' ]]; then
GH_TAGS="$GH_REPO/releases/latest"
fi
# Validate token.
curl -o /dev/null -sH "$AUTH" $GH_REPO || { echo "Error: Invalid repo, token or network issue!"; exit 1; }
# Read asset tags.
response=$(curl -sH "$AUTH" $GH_TAGS)
# Get ID of the asset based on given filename.
eval $(echo "$response" | grep -m 1 "id.:" | grep -w id | tr : = | tr -cd '[[:alnum:]]=')
[ "$id" ] || { echo "Error: Failed to get release id for tag: $tag"; echo "$response" | awk 'length($0)<100' >&2; exit 1; }
# Upload asset
# Construct url
GH_ASSET="https://uploads.github.com/repos/$owner/$repo/releases/$id/assets?name=$(basename $filename)"
curl "$GITHUB_OAUTH_BASIC" --data-binary @"$filename" -H "Authorization: token $github_api_token" -H "Content-Type: application/octet-stream" $GH_ASSET

View File

@ -1,23 +1,19 @@
name: Cross build packages
on:
schedule:
- cron: '0 */6 * * *'
push:
tags:
- v*
release:
types:
- published
pull_request:
workflow_dispatch:
repository_dispatch:
types: [run_actions]
jobs:
windows:
runs-on: windows-2019
if: startsWith(github.ref, 'refs/tags/')
steps:
- uses: actions/checkout@v1
- uses: ilammy/msvc-dev-cmd@v1
@ -73,8 +69,6 @@ jobs:
mac:
runs-on: macos-10.15
if: startsWith(github.ref, 'refs/tags/')
steps:
- uses: actions/checkout@v1
- name: prepare
@ -205,7 +199,6 @@ jobs:
done
cd -
- name: build emqx packages
if: (matrix.arch == 'amd64' && matrix.emqx == 'emqx') || startsWith(github.ref, 'refs/tags/')
env:
ERL_OTP: erl23.2.2
EMQX: ${{ matrix.emqx }}
@ -248,8 +241,6 @@ jobs:
docker:
runs-on: ubuntu-20.04
if: startsWith(github.ref, 'refs/tags/')
strategy:
matrix:
arch:
@ -261,17 +252,6 @@ jobs:
steps:
- uses: actions/checkout@v1
- name: get deps
env:
ERL_OTP: erl23.2.2
run: |
docker run -i --rm \
-e GITHUB_RUN_ID=$GITHUB_RUN_ID \
-e GITHUB_REF=$GITHUB_REF \
-v $(pwd):/emqx \
-w /emqx \
emqx/build-env:${ERL_OTP}-alpine-amd64 \
sh -c "make deps-emqx"
- name: build emqx docker image
env:
ARCH: ${{ matrix.arch[0] }}
@ -301,6 +281,11 @@ jobs:
if: startsWith(github.ref, 'refs/tags/')
steps:
- name: get_version
run: |
echo 'version<<EOF' >> $GITHUB_ENV
echo ${{ github.ref }} | sed -r "s ^refs/heads/|^refs/tags/(.*) \1 g" >> $GITHUB_ENV
echo 'EOF' >> $GITHUB_ENV
- uses: actions/download-artifact@v2
with:
name: emqx
@ -325,17 +310,13 @@ jobs:
- name: upload aws s3
run: |
set -e -x -u
version=$(echo ${{ github.ref }} | sed -r "s ^refs/heads/|^refs/tags/(.*) \1 g")
curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
unzip -q awscliv2.zip
sudo ./aws/install
aws configure set aws_access_key_id ${{ secrets.AWS_ACCESS_KEY_ID }}
aws configure set aws_secret_access_key ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws configure set default.region us-west-2
aws s3 cp --recursive _packages/emqx s3://packages.emqx/emqx-ce/$version
aws s3 cp --recursive _packages/emqx-edge s3://packages.emqx/emqx-edge/$version
aws cloudfront create-invalidation --distribution-id E170YEULGLT8XB --paths "/emqx-ce/$version/*,/emqx-edge/$version/*"
aws s3 cp --recursive _packages/emqx s3://packages.emqx/emqx-ce/${{ env.version }}
aws s3 cp --recursive _packages/emqx-edge s3://packages.emqx/emqx-edge/${{ env.version }}
aws cloudfront create-invalidation --distribution-id E170YEULGLT8XB --paths "/emqx-ce/${{ env.version }}/*,/emqx-edge/${{ env.version }}/*"
mkdir packages
mv _packages/emqx/* packages
@ -343,28 +324,27 @@ jobs:
- uses: actions/checkout@v2
with:
path: emqx
- name: update to github and emqx.io
- uses: Rory-Z/upload-release-asset@v1
if: github.event_name == 'release'
with:
repo: emqx
path: "packages/emqx-*"
token: ${{ github.token }}
- name: update to emqx.io
if: github.event_name == 'release'
run: |
set -e -x -u
version=$(echo ${{ github.ref }} | sed -r "s ^refs/heads/|^refs/tags/(.*) \1 g")
cd packages
for var in $(ls); do
../emqx/.ci/build_packages/upload_github_release_asset.sh owner=emqx repo=emqx tag=$version filename=$var github_api_token=$(echo ${{ secrets.AccessToken }})
sleep 1
done
curl -w %{http_code} \
--insecure \
-H "Content-Type: application/json" \
-H "token: ${{ secrets.EMQX_IO_TOKEN }}" \
-X POST \
-d "{\"repo\":\"emqx/emqx\", \"tag\": \"${version}\" }" \
-d "{\"repo\":\"emqx/emqx\", \"tag\": \"${{ env.version }}\" }" \
${{ secrets.EMQX_IO_RELEASE_API }}
- name: push docker image to docker hub
if: github.event_name == 'release'
run: |
set -e -x -u
version=$(echo ${{ github.ref }} | sed -r "s ^refs/heads/|^refs/tags/(.*) \1 g")
sudo make -C emqx docker-prepare
cd packages && for var in $(ls |grep docker |grep -v sha256); do unzip $var; sudo docker load < ${var%.*}; rm -f ${var%.*}; done && cd -
echo ${{ secrets.DOCKER_HUB_TOKEN }} |sudo docker login -u ${{ secrets.DOCKER_HUB_USER }} --password-stdin
@ -375,23 +355,26 @@ jobs:
- name: update repo.emqx.io
if: github.event_name == 'release'
run: |
set -e -x -u
version=$(echo ${{ github.ref }} | sed -r "s ^refs/heads/|^refs/tags/(.*) \1 g")
curl \
curl --silent --show-error \
-H "Authorization: token ${{ secrets.AccessToken }}" \
-H "Accept: application/vnd.github.v3+json" \
-X POST \
-d "{\"ref\":\"v1.0.0\",\"inputs\":{\"version\": \"${version}\", \"emqx_ce\": \"true\"}}" \
https://api.github.com/repos/emqx/emqx-ci-helper/actions/workflows/update_repos.yaml/dispatches
-d "{\"ref\":\"v1.0.1\",\"inputs\":{\"version\": \"${{ env.version }}\", \"emqx_ce\": \"true\"}}" \
"https://api.github.com/repos/emqx/emqx-ci-helper/actions/workflows/update_emqx_repos.yaml/dispatches"
- name: update homebrew packages
if: github.event_name == 'release'
run: |
if [ -z $(echo $version | grep -oE "(alpha|beta|rc)\.[0-9]") ]; then
curl --silent --show-error \
-H "Authorization: token ${{ secrets.AccessToken }}" \
-H "Accept: application/vnd.github.v3+json" \
-X POST \
-d "{\"ref\":\"v1.0.1\",\"inputs\":{\"version\": \"${{ env.version }}\"}}" \
"https://api.github.com/repos/emqx/emqx-ci-helper/actions/workflows/update_emqx_homebrew.yaml/dispatches"
fi
- uses: geekyeggo/delete-artifact@v1
with:
name: emqx
- uses: geekyeggo/delete-artifact@v1
with:
name: emqx-edge
# - name: update homebrew packages
# run: |
# version=$(echo ${{ github.ref }} | sed -r "s .*/.*/(.*) \1 g")
# if [ ! -z $(echo $version | grep -oE "v[0-9]+\.[0-9]+(\.[0-9]+)?") ] && [ -z $(echo $version | grep -oE "(alpha|beta|rc)\.[0-9]") ]; then
# curl -H "Authorization: token ${{ secrets.AccessToken }}" -H "Accept: application/vnd.github.everest-preview+json" -H "Content-Type: application/json" -X POST -d "{\"event_type\":\"update_homebrew\",\"client_payload\":{\"version\": \"$version\"}}" https://api.github.com/repos/emqx/emqx-packages-docker/dispatches
# fi

View File

@ -0,0 +1,13 @@
name: Check Rebar Dependencies
on: [pull_request]
jobs:
check_deps_integrity:
runs-on: ubuntu-20.04
container: emqx/build-env:erl23.2.2-ubuntu20.04
steps:
- uses: actions/checkout@v2
- name: Run check-deps-integrity.escript
run: ./scripts/check-deps-integrity.escript

View File

@ -8,9 +8,6 @@ on:
types:
- published
pull_request:
workflow_dispatch:
repository_dispatch:
types: [run_actions]
jobs:
ldap:

View File

@ -8,9 +8,6 @@ on:
types:
- published
pull_request:
workflow_dispatch:
repository_dispatch:
types: [run_actions]
jobs:
docker_test:

View File

@ -8,12 +8,20 @@ on:
types:
- published
pull_request:
workflow_dispatch:
repository_dispatch:
types: [run_actions]
jobs:
run_test_case:
run_static_analysis:
runs-on: ubuntu-20.04
container: emqx/build-env:erl23.2.2-ubuntu20.04
steps:
- uses: actions/checkout@v2
- name: xref
run: make xref
- name: dialyzer
run: make dialyzer
run_common_test:
runs-on: ubuntu-20.04
steps:
@ -29,25 +37,12 @@ jobs:
run: |
docker-compose -f .ci/apps_tests/docker-compose.yaml build --no-cache
docker-compose -f .ci/apps_tests/docker-compose.yaml up -d
- name: run tests
- name: run eunit
run: docker exec -i erlang bash -c "make eunit"
- name: run common test
run: docker exec -i erlang bash -c "make ct"
- name: run cover
run: |
export EMQX_AUTH__LDAP__SERVERS=ldap_server \
EMQX_AUTH__MONGO__SERVER=mongo_server:27017 \
EMQX_AUTH__REDIS__SERVER=redis_server:6379 \
EMQX_AUTH__MYSQL__SERVER=mysql_server:3306 \
EMQX_AUTH__MYSQL__USERNAME=root \
EMQX_AUTH__MYSQL__PASSWORD=public \
EMQX_AUTH__MYSQL__DATABASE=mqtt \
EMQX_AUTH__PGSQL__SERVER=pgsql_server:5432 \
EMQX_AUTH__PGSQL__USERNAME=root \
EMQX_AUTH__PGSQL__PASSWORD=public \
EMQX_AUTH__PGSQL__DATABASE=mqtt \
CUTTLEFISH_ENV_OVERRIDE_PREFIX=EMQX_
printenv > .env
docker exec -i erlang bash -c "make xref"
docker exec --env-file .env -i erlang bash -c "make ct"
docker exec --env-file .env -i erlang bash -c "make eunit"
docker exec -i erlang bash -c "make cover"
docker exec -i erlang bash -c "make coveralls"
- uses: actions/upload-artifact@v1

1
.gitignore vendored
View File

@ -44,3 +44,4 @@ elvis
emqx_dialyzer_*_plt
*/emqx_dashboard/priv/www
dist.zip
scripts/git-token

View File

@ -3,7 +3,6 @@ DASHBOARD_VERSION = v4.3.0-beta.1
REBAR = $(CURDIR)/rebar3
BUILD = $(CURDIR)/build
SCRIPTS = $(CURDIR)/scripts
export EMQX_ENTERPRISE=false
export PKG_VSN ?= $(shell $(CURDIR)/pkg-vsn.sh)
PROFILE ?= emqx

View File

@ -250,8 +250,4 @@ uri(Parts) when is_list(Parts) ->
NParts = [b2l(E) || E <- Parts],
?HOST ++ filename:join([?BASE_PATH, ?API_VERSION, "acl"| NParts]).
%% @private
b2l(B) when is_binary(B) ->
http_uri:encode(binary_to_list(B));
b2l(L) when is_list(L) ->
http_uri:encode(L).
b2l(B) -> binary_to_list(emqx_http_lib:uri_encode(iolist_to_binary(B))).

View File

@ -39,7 +39,6 @@
]).
-define(RESOURCE_TYPE_MQTT, 'bridge_mqtt').
-define(RESOURCE_TYPE_MQTT_SUB, 'bridge_mqtt_sub').
-define(RESOURCE_TYPE_RPC, 'bridge_rpc').
-define(RESOURCE_CONFIG_SPEC_MQTT, #{
@ -94,7 +93,7 @@
},
password => #{
order => 6,
type => string,
type => password,
required => false,
default => <<"">>,
title => #{en => <<"Password">>,
@ -111,7 +110,7 @@
zh => <<"桥接挂载点"/utf8>>},
description => #{
en => <<"MountPoint for bridge topic:<br/>"
"Example: The topic of messages sent to `topic1` on local node"
"Example: The topic of messages sent to `topic1` on local node "
"will be transformed to `bridge/aws/${node}/topic1`">>,
zh => <<"桥接主题的挂载点:<br/>"
"示例: 本地节点向 `topic1` 发消息,远程桥接节点的主题"
@ -126,8 +125,8 @@
enum => [<<"on">>, <<"off">>],
title => #{en => <<"Disk Cache">>,
zh => <<"磁盘缓存"/utf8>>},
description => #{en => <<"The flag which determines whether messages"
"can be cached on local disk when bridge is"
description => #{en => <<"The flag which determines whether messages "
"can be cached on local disk when bridge is "
"disconnected">>,
zh => <<"当桥接断开时用于控制是否将消息缓存到本地磁"
"盘队列上"/utf8>>}
@ -244,183 +243,6 @@
}
}).
-define(RESOURCE_CONFIG_SPEC_MQTT_SUB, #{
address => #{
order => 1,
type => string,
required => true,
default => <<"127.0.0.1:1883">>,
title => #{en => <<" Broker Address">>,
zh => <<"远程 broker 地址"/utf8>>},
description => #{en => <<"The MQTT Remote Address">>,
zh => <<"远程 MQTT Broker 的地址"/utf8>>}
},
pool_size => #{
order => 2,
type => number,
required => true,
default => 8,
title => #{en => <<"Pool Size">>,
zh => <<"连接池大小"/utf8>>},
description => #{en => <<"MQTT Connection Pool Size">>,
zh => <<"连接池大小"/utf8>>}
},
clientid => #{
order => 3,
type => string,
required => true,
default => <<"client">>,
title => #{en => <<"ClientId">>,
zh => <<"客户端 Id"/utf8>>},
description => #{en => <<"ClientId for connecting to remote MQTT broker">>,
zh => <<"连接远程 Broker 的 ClientId"/utf8>>}
},
append => #{
order => 4,
type => boolean,
required => true,
default => true,
title => #{en => <<"Append GUID">>,
zh => <<"附加 GUID"/utf8>>},
description => #{en => <<"Append GUID to MQTT ClientId?">>,
zh => <<"是否将GUID附加到 MQTT ClientId 后"/utf8>>}
},
username => #{
order => 5,
type => string,
required => false,
default => <<"">>,
title => #{en => <<"Username">>, zh => <<"用户名"/utf8>>},
description => #{en => <<"Username for connecting to remote MQTT Broker">>,
zh => <<"连接远程 Broker 的用户名"/utf8>>}
},
password => #{
order => 6,
type => string,
required => false,
default => <<"">>,
title => #{en => <<"Password">>,
zh => <<"密码"/utf8>>},
description => #{en => <<"Password for connecting to remote MQTT Broker">>,
zh => <<"连接远程 Broker 的密码"/utf8>>}
},
subscription_opts => #{
order => 7,
type => array,
items => #{
type => object,
schema => #{
topic => #{
order => 1,
type => string,
default => <<>>,
title => #{en => <<"MQTT Topic">>,
zh => <<"MQTT 主题"/utf8>>},
description => #{en => <<"MQTT Topic">>,
zh => <<"MQTT 主题"/utf8>>}
},
qos => #{
order => 2,
type => number,
enum => [0, 1, 2],
default => 0,
title => #{en => <<"MQTT Topic QoS">>,
zh => <<"MQTT 服务质量"/utf8>>},
description => #{en => <<"MQTT Topic QoS">>,
zh => <<"MQTT 服务质量"/utf8>>}
}
}
},
default => [],
title => #{en => <<"Subscription Opts">>,
zh => <<"订阅选项"/utf8>>},
description => #{en => <<"Subscription Opts">>,
zh => <<"订阅选项"/utf8>>}
},
proto_ver => #{
order => 8,
type => string,
required => false,
default => <<"mqttv4">>,
enum => [<<"mqttv3">>, <<"mqttv4">>, <<"mqttv5">>],
title => #{en => <<"Protocol Version">>,
zh => <<"协议版本"/utf8>>},
description => #{en => <<"MQTTT Protocol version">>,
zh => <<"MQTT 协议版本"/utf8>>}
},
keepalive => #{
order => 9,
type => string,
required => false,
default => <<"60s">> ,
title => #{en => <<"Keepalive">>,
zh => <<"心跳间隔"/utf8>>},
description => #{en => <<"Keepalive">>,
zh => <<"心跳间隔"/utf8>>}
},
reconnect_interval => #{
order => 10,
type => string,
required => false,
default => <<"30s">>,
title => #{en => <<"Reconnect Interval">>,
zh => <<"重连间隔"/utf8>>},
description => #{en => <<"Reconnect interval of bridge">>,
zh => <<"重连间隔"/utf8>>}
},
ssl => #{
order => 11,
type => string,
required => false,
default => <<"off">>,
enum => [<<"on">>, <<"off">>],
title => #{en => <<"Bridge SSL">>,
zh => <<"Bridge SSL"/utf8>>},
description => #{en => <<"Switch which used to enable ssl connection of the bridge">>,
zh => <<"是否启用 Bridge SSL 连接"/utf8>>}
},
cacertfile => #{
order => 12,
type => string,
required => false,
default => <<"etc/certs/cacert.pem">>,
title => #{en => <<"CA certificates">>,
zh => <<"CA 证书"/utf8>>},
description => #{en => <<"The file path of the CA certificates">>,
zh => <<"CA 证书路径"/utf8>>}
},
certfile => #{
order => 13,
type => string,
required => false,
default => <<"etc/certs/client-cert.pem">>,
title => #{en => <<"SSL Certfile">>,
zh => <<"SSL 客户端证书"/utf8>>},
description => #{en => <<"The file path of the client certfile">>,
zh => <<"客户端证书路径"/utf8>>}
},
keyfile => #{
order => 14,
type => string,
required => false,
default => <<"etc/certs/client-key.pem">>,
title => #{en => <<"SSL Keyfile">>,
zh => <<"SSL 密钥文件"/utf8>>},
description => #{en => <<"The file path of the client keyfile">>,
zh => <<"客户端密钥路径"/utf8>>}
},
ciphers => #{
order => 15,
type => string,
required => false,
default => <<"ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384">>,
title => #{en => <<"SSL Ciphers">>,
zh => <<"SSL 加密算法"/utf8>>},
description => #{en => <<"SSL Ciphers">>,
zh => <<"SSL 加密算法"/utf8>>}
}
}).
-define(RESOURCE_CONFIG_SPEC_RPC, #{
address => #{
order => 1,
@ -440,7 +262,7 @@
title => #{en => <<"Bridge MountPoint">>,
zh => <<"桥接挂载点"/utf8>>},
description => #{en => <<"MountPoint for bridge topic<br/>"
"Example: The topic of messages sent to `topic1` on local node"
"Example: The topic of messages sent to `topic1` on local node "
"will be transformed to `bridge/aws/${node}/topic1`">>,
zh => <<"桥接主题的挂载点<br/>"
"示例: 本地节点向 `topic1` 发消息,远程桥接节点的主题"
@ -484,8 +306,8 @@
enum => [<<"on">>, <<"off">>],
title => #{en => <<"Disk Cache">>,
zh => <<"磁盘缓存"/utf8>>},
description => #{en => <<"The flag which determines whether messages"
"can be cached on local disk when bridge is"
description => #{en => <<"The flag which determines whether messages "
"can be cached on local disk when bridge is "
"disconnected">>,
zh => <<"当桥接断开时用于控制是否将消息缓存到本地磁"
"盘队列上"/utf8>>}
@ -510,15 +332,6 @@
description => #{en => <<"MQTT Message Bridge">>, zh => <<"MQTT 消息桥接"/utf8>>}
}).
-resource_type(#{
name => ?RESOURCE_TYPE_MQTT_SUB,
create => on_resource_create,
status => on_get_resource_status,
destroy => on_resource_destroy,
params => ?RESOURCE_CONFIG_SPEC_MQTT_SUB,
title => #{en => <<"MQTT Subscribe">>, zh => <<"MQTT Subscribe"/utf8>>},
description => #{en => <<"MQTT Subscribe">>, zh => <<"MQTT 订阅消息"/utf8>>}
}).
-resource_type(#{
name => ?RESOURCE_TYPE_RPC,
@ -544,7 +357,8 @@
default => <<"">>,
title => #{en => <<"Forward Topic">>,
zh => <<"转发消息主题"/utf8>>},
description => #{en => <<"The topic used when forwarding the message. Defaults to the topic of the bridge message if not provided.">>,
description => #{en => <<"The topic used when forwarding the message. "
"Defaults to the topic of the bridge message if not provided.">>,
zh => <<"转发消息时使用的主题。如果未提供,则默认为桥接消息的主题。"/utf8>>}
},
payload_tmpl => #{
@ -555,8 +369,11 @@
default => <<"">>,
title => #{en => <<"Payload Template">>,
zh => <<"消息内容模板"/utf8>>},
description => #{en => <<"The payload template, variable interpolation is supported. If using empty template (default), then the payload will be all the available vars in JSON format">>,
zh => <<"消息内容模板,支持变量。若使用空模板(默认),消息内容为 JSON 格式的所有字段"/utf8>>}
description => #{en => <<"The payload template, variable interpolation is supported. "
"If using empty template (default), then the payload will be "
"all the available vars in JSON format">>,
zh => <<"消息内容模板,支持变量。"
"若使用空模板(默认),消息内容为 JSON 格式的所有字段"/utf8>>}
}
},
title => #{en => <<"Data bridge to MQTT Broker">>,
@ -733,12 +550,6 @@ options(Options, PoolName, ResId) ->
{connect_module, emqx_bridge_rpc},
{batch_size, Get(<<"batch_size">>)}];
false ->
Subscriptions = format_subscriptions(GetD(<<"subscription_opts">>, [])),
Subscriptions1 = case Get(<<"topic">>) of
undefined -> Subscriptions;
Topic ->
[{subscriptions, [{Topic, Get(<<"qos">>)}]} | Subscriptions]
end,
[{address, binary_to_list(Address)},
{bridge_mode, GetD(<<"bridge_mode">>, true)},
{clean_start, true},
@ -750,8 +561,7 @@ options(Options, PoolName, ResId) ->
{password, str(Get(<<"password">>))},
{proto_ver, mqtt_ver(Get(<<"proto_ver">>))},
{retry_interval, cuttlefish_duration:parse(str(GetD(<<"retry_interval">>, "30s")), s)}
| maybe_ssl(Options, cuttlefish_flag:parse(str(Get(<<"ssl">>))), ResId)
] ++ Subscriptions1
| maybe_ssl(Options, Get(<<"ssl">>), ResId)]
end.
maybe_ssl(_Options, false, _ResId) ->
@ -767,8 +577,3 @@ mqtt_ver(ProtoVer) ->
<<"mqttv5">> -> v5;
_ -> v4
end.
format_subscriptions(SubOpts) ->
lists:map(fun(Sub) ->
{maps:get(<<"topic">>, Sub), maps:get(<<"qos">>, Sub)}
end, SubOpts).

View File

@ -55,6 +55,7 @@ to_export(emqx_bridge_mqtt, Mountpoint,
#mqtt_msg{qos = QoS,
retain = Retain,
topic = topic(Mountpoint, Topic),
props = #{},
payload = Payload};
to_export(_Module, Mountpoint,
#message{topic = Topic} = Msg) ->

View File

@ -1,26 +1,4 @@
{deps,
[
{gen_coap, {git, "https://github.com/emqx/gen_coap", {tag, "v0.3.1"}}}
]}.
{edoc_opts, [{preprocess, true}]}.
{erl_opts, [warn_unused_vars,
warn_shadow_vars,
warn_unused_import,
warn_obsolete_guard,
debug_info,
{parse_transform}]}.
{xref_checks, [undefined_function_calls, undefined_functions,
locals_not_used, deprecated_function_calls,
warnings_as_errors, deprecated_functions]}.
{cover_enabled, true}.
{cover_opts, [verbose]}.
{cover_export_enabled, true}.
{profiles,
[{test,
[{deps,
[{er_coap_client, {git, "https://github.com/emqx/er_coap_client", {tag, "v1.0"}}}]}
]}
{gen_coap, {git, "https://github.com/emqx/gen_coap", {tag, "v0.3.2"}}}
]}.

View File

@ -241,7 +241,7 @@ handle_received_publish(Topic, MaxAge, Format, Payload) ->
handle_received_create(TopicPrefix, MaxAge, Payload) ->
case core_link:decode(Payload) of
[{rootless, [Topic], [{ct, CT}]}] when is_binary(Topic), Topic =/= <<>> ->
TrueTopic = percent_decode(Topic),
TrueTopic = emqx_http_lib:uri_decode(Topic),
?LOG(debug, "decoded link-format payload, the Topic=~p, CT=~p~n", [TrueTopic, CT]),
LocPath = concatenate_location_path([<<"ps">>, TopicPrefix, TrueTopic]),
FullTopic = binary:part(LocPath, 4, byte_size(LocPath)-4),
@ -259,14 +259,6 @@ handle_received_create(TopicPrefix, MaxAge, Payload) ->
{error, bad_request}
end.
%% @private Copy from http_uri.erl which has been deprecated since OTP-23
percent_decode(<<$%, Hex:2/binary, Rest/bits>>) ->
<<(binary_to_integer(Hex, 16)), (percent_decode(Rest))/binary>>;
percent_decode(<<First:1/binary, Rest/bits>>) ->
<<First/binary, (percent_decode(Rest))/binary>>;
percent_decode(<<>>) ->
<<>>.
%% When topic is timeout, server should return nocontent here,
%% but gen_coap only receive return value of #coap_content from coap_get, so temporarily we can't give the Code 2.07 {ok, nocontent} out.TBC!!!
return_resource(Topic, Payload, MaxAge, TimeStamp, Content) ->

View File

@ -47,6 +47,9 @@
-define(LOG(Level, Format, Args), logger:Level("LWM2M-RESOURCE: " ++ Format, Args)).
-dialyzer([{nowarn_function, [coap_discover/2]}]).
% we use {'absolute', string(), [{atom(), binary()}]} as coap_uri()
% https://github.com/emqx/lwm2m-coap/blob/258e9bd3762124395e83c1e68a1583b84718230f/src/lwm2m_coap_resource.erl#L61
% resource operations
coap_discover(_Prefix, _Args) ->
[{absolute, "mqtt", []}].

View File

@ -90,13 +90,13 @@ basename(OldBaseName, _ObjectId, ObjectInstanceId, _ResourceId, 2) ->
[ObjId, ObjInsId, _ResId] -> <<$/, ObjId/binary, $/, ObjInsId/binary>>;
[ObjId, ObjInsId] -> <<$/, ObjId/binary, $/, ObjInsId/binary>>;
[ObjId] -> <<$/, ObjId/binary, $/, (integer_to_binary(ObjectInstanceId))/binary>>
end;
basename(OldBaseName, _ObjectId, _ObjectInstanceId, _ResourceId, 1) ->
case binary:split(binary_util:trim(OldBaseName, $/), [<<$/>>], [global]) of
[ObjId, _ObjInsId, _ResId] -> <<$/, ObjId/binary>>;
[ObjId, _ObjInsId] -> <<$/, ObjId/binary>>;
[ObjId] -> <<$/, ObjId/binary>>
end.
% basename(OldBaseName, _ObjectId, _ObjectInstanceId, _ResourceId, 1) ->
% case binary:split(binary_util:trim(OldBaseName, $/), [<<$/>>], [global]) of
% [ObjId, _ObjInsId, _ResId] -> <<$/, ObjId/binary>>;
% [ObjId, _ObjInsId] -> <<$/, ObjId/binary>>;
% [ObjId] -> <<$/, ObjId/binary>>
% end.
make_path(RelativePath, Id) ->
<<RelativePath/binary, $/, (integer_to_binary(Id))/binary>>.
@ -187,7 +187,7 @@ insert(Level, #{<<"path">> := EleName, <<"type">> := Type, <<"value">> := Value}
case Level of
object -> insert_resource_into_object(Path, BinaryValue, Acc);
object_instance -> insert_resource_into_object_instance(Path, BinaryValue, Acc);
resource -> insert_resource_instance_into_resource(Path, BinaryValue, Acc)
resource -> insert_resource_instance_into_resource(hd(Path), BinaryValue, Acc)
end.
% json text to TLV binary

View File

@ -121,7 +121,7 @@ update_reg_info(NewRegInfo, Lwm2mState = #lwm2m_state{
UpdatedRegInfo = maps:merge(RegInfo, NewRegInfo),
case proplists:get_value(update_msg_publish_condition,
_ = case proplists:get_value(update_msg_publish_condition,
lwm2m_coap_responder:options(), contains_object_list) of
always ->
send_to_broker(<<"update">>, #{<<"data">> => UpdatedRegInfo}, Lwm2mState);
@ -202,7 +202,6 @@ terminate(Reason, Lwm2mState) ->
?LOG(error, "process terminated: ~p, lwm2m_state: ~p", [Reason, Lwm2mState]).
clean_subscribe(_CoapPid, _Error, undefined, _Lwm2mState) -> ok;
clean_subscribe(_CoapPid, _Error, _SubTopic, undefined) -> ok;
clean_subscribe(CoapPid, {shutdown, Error}, SubTopic, Lwm2mState) ->
do_clean_subscribe(CoapPid, Error, SubTopic, Lwm2mState);
clean_subscribe(CoapPid, Error, SubTopic, Lwm2mState) ->
@ -407,8 +406,8 @@ clientinfo(#lwm2m_state{peername = {PeerHost, _},
peerhost => PeerHost,
sockport => 5683, %% FIXME:
clientid => EndpointName,
username => null,
password => null,
username => undefined,
password => undefined,
peercert => nossl,
is_bridge => false,
is_superuser => false,

View File

@ -3,7 +3,7 @@
{vsn, "4.3.0"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_management_sup]},
{applications, [kernel,stdlib,minirest,emqx_modules]},
{applications, [kernel,stdlib,minirest]},
{mod, {emqx_mgmt_app,[]}},
{env, []},
{licenses, ["Apache-2.0"]},

View File

@ -36,15 +36,6 @@
%% Metrics and Stats
-export([ get_metrics/0
, get_metrics/1
, get_all_topic_metrics/0
, get_topic_metrics/1
, get_topic_metrics/2
, register_topic_metrics/1
, register_topic_metrics/2
, unregister_topic_metrics/1
, unregister_topic_metrics/2
, unregister_all_topic_metrics/0
, unregister_all_topic_metrics/1
, get_stats/0
, get_stats/1
]).
@ -91,14 +82,6 @@
, reload_plugin/2
]).
%% Modules
-export([ list_modules/0
, list_modules/1
, load_module/2
, unload_module/2
, reload_module/2
]).
%% Listeners
-export([ list_listeners/0
, list_listeners/1
@ -118,26 +101,6 @@
, delete_banned/1
]).
%% Export/Import
-export([ export_rules/0
, export_resources/0
, export_blacklist/0
, export_applications/0
, export_users/0
, export_auth_mnesia/0
, export_acl_mnesia/0
, import_rules/1
, import_resources/1
, import_resources_and_rules/3
, import_blacklist/1
, import_applications/1
, import_users/1
, import_auth_clientid/1 %% BACKW: 4.1.x
, import_auth_username/1 %% BACKW: 4.1.x
, import_auth_mnesia/2
, import_acl_mnesia/2
, to_version/1
]).
-export([ enable_telemetry/0
, disable_telemetry/0
@ -217,73 +180,6 @@ get_metrics(Node) when Node =:= node() ->
get_metrics(Node) ->
rpc_call(Node, get_metrics, [Node]).
get_all_topic_metrics() ->
lists:foldl(fun(Topic, Acc) ->
case get_topic_metrics(Topic) of
{error, _Reason} ->
Acc;
Metrics ->
[#{topic => Topic, metrics => Metrics} | Acc]
end
end, [], emqx_mod_topic_metrics:all_registered_topics()).
get_topic_metrics(Topic) ->
lists:foldl(fun(Node, Acc) ->
case get_topic_metrics(Node, Topic) of
{error, _Reason} ->
Acc;
Metrics ->
case Acc of
[] -> Metrics;
_ ->
lists:foldl(fun({K, V}, Acc0) ->
[{K, V + proplists:get_value(K, Metrics, 0)} | Acc0]
end, [], Acc)
end
end
end, [], ekka_mnesia:running_nodes()).
get_topic_metrics(Node, Topic) when Node =:= node() ->
emqx_mod_topic_metrics:metrics(Topic);
get_topic_metrics(Node, Topic) ->
rpc_call(Node, get_topic_metrics, [Node, Topic]).
register_topic_metrics(Topic) ->
Results = [register_topic_metrics(Node, Topic) || Node <- ekka_mnesia:running_nodes()],
case lists:any(fun(Item) -> Item =:= ok end, Results) of
true -> ok;
false -> lists:last(Results)
end.
register_topic_metrics(Node, Topic) when Node =:= node() ->
emqx_mod_topic_metrics:register(Topic);
register_topic_metrics(Node, Topic) ->
rpc_call(Node, register_topic_metrics, [Node, Topic]).
unregister_topic_metrics(Topic) ->
Results = [unregister_topic_metrics(Node, Topic) || Node <- ekka_mnesia:running_nodes()],
case lists:any(fun(Item) -> Item =:= ok end, Results) of
true -> ok;
false -> lists:last(Results)
end.
unregister_topic_metrics(Node, Topic) when Node =:= node() ->
emqx_mod_topic_metrics:unregister(Topic);
unregister_topic_metrics(Node, Topic) ->
rpc_call(Node, unregister_topic_metrics, [Node, Topic]).
unregister_all_topic_metrics() ->
Results = [unregister_all_topic_metrics(Node) || Node <- ekka_mnesia:running_nodes()],
case lists:any(fun(Item) -> Item =:= ok end, Results) of
true -> ok;
false -> lists:last(Results)
end.
unregister_all_topic_metrics(Node) when Node =:= node() ->
emqx_mod_topic_metrics:unregister_all();
unregister_all_topic_metrics(Node) ->
rpc_call(Node, unregister_topic_metrics, [Node]).
get_stats() ->
[{Node, get_stats(Node)} || Node <- ekka_mnesia:running_nodes()].
@ -407,8 +303,8 @@ list_subscriptions_via_topic(Node, Topic, {M,F}) when Node =:= node() ->
MatchSpec = [{{{'_', '$1'}, '_'}, [{'=:=','$1', Topic}], ['$_']}],
M:F(ets:select(emqx_suboption, MatchSpec));
list_subscriptions_via_topic(Node, {topic, Topic}, FormatFun) ->
rpc_call(Node, list_subscriptions_via_topic, [Node, {topic, Topic}, FormatFun]).
list_subscriptions_via_topic(Node, Topic, FormatFun) ->
rpc_call(Node, list_subscriptions_via_topic, [Node, Topic, FormatFun]).
lookup_subscriptions(ClientId) ->
lists:append([lookup_subscriptions(Node, ClientId) || Node <- ekka_mnesia:running_nodes()]).
@ -504,33 +400,6 @@ reload_plugin(Node, Plugin) when Node =:= node() ->
reload_plugin(Node, Plugin) ->
rpc_call(Node, reload_plugin, [Node, Plugin]).
%%--------------------------------------------------------------------
%% Modules
%%--------------------------------------------------------------------
list_modules() ->
[{Node, list_modules(Node)} || Node <- ekka_mnesia:running_nodes()].
list_modules(Node) when Node =:= node() ->
emqx_modules:list();
list_modules(Node) ->
rpc_call(Node, list_modules, [Node]).
load_module(Node, Module) when Node =:= node() ->
emqx_modules:load(Module);
load_module(Node, Module) ->
rpc_call(Node, load_module, [Node, Module]).
unload_module(Node, Module) when Node =:= node() ->
emqx_modules:unload(Module);
unload_module(Node, Module) ->
rpc_call(Node, unload_module, [Node, Module]).
reload_module(Node, Module) when Node =:= node() ->
emqx_modules:reload(Module);
reload_module(Node, Module) ->
rpc_call(Node, reload_module, [Node, Module]).
%%--------------------------------------------------------------------
%% Listeners
%%--------------------------------------------------------------------
@ -613,293 +482,7 @@ create_banned(Banned) ->
delete_banned(Who) ->
emqx_banned:delete(Who).
%%--------------------------------------------------------------------
%% Data Export and Import
%%--------------------------------------------------------------------
export_rules() ->
lists:map(fun({_, RuleId, _, RawSQL, _, _, _, _, _, _, Actions, Enabled, Desc}) ->
[{id, RuleId},
{rawsql, RawSQL},
{actions, actions_to_prop_list(Actions)},
{enabled, Enabled},
{description, Desc}]
end, emqx_rule_registry:get_rules()).
export_resources() ->
lists:map(fun({_, Id, Type, Config, CreatedAt, Desc}) ->
NCreatedAt = case CreatedAt of
undefined -> null;
_ -> CreatedAt
end,
[{id, Id},
{type, Type},
{config, maps:to_list(Config)},
{created_at, NCreatedAt},
{description, Desc}]
end, emqx_rule_registry:get_resources()).
export_blacklist() ->
lists:map(fun(#banned{who = Who, by = By, reason = Reason, at = At, until = Until}) ->
NWho = case Who of
{peerhost, Peerhost} -> {peerhost, inet:ntoa(Peerhost)};
_ -> Who
end,
[{who, [NWho]}, {by, By}, {reason, Reason}, {at, At}, {until, Until}]
end, ets:tab2list(emqx_banned)).
export_applications() ->
lists:map(fun({_, AppID, AppSecret, Name, Desc, Status, Expired}) ->
[{id, AppID}, {secret, AppSecret}, {name, Name}, {desc, Desc}, {status, Status}, {expired, Expired}]
end, ets:tab2list(mqtt_app)).
export_users() ->
lists:map(fun({_, Username, Password, Tags}) ->
[{username, Username}, {password, base64:encode(Password)}, {tags, Tags}]
end, ets:tab2list(mqtt_admin)).
export_auth_mnesia() ->
case ets:info(emqx_user) of
undefined -> [];
_ ->
lists:map(fun({_, {Type, Login}, Password, CreatedAt}) ->
[{login, Login}, {type, Type}, {password, base64:encode(Password)}, {created_at, CreatedAt}]
end, ets:tab2list(emqx_user))
end.
export_acl_mnesia() ->
case ets:info(emqx_acl) of
undefined -> [];
_ ->
lists:map(fun({_, Filter, Action, Access, CreatedAt}) ->
Filter1 = case Filter of
{{Type, TypeValue}, Topic} ->
[{type, Type}, {type_value, TypeValue}, {topic, Topic}];
{Type, Topic} ->
[{type, Type}, {topic, Topic}]
end,
Filter1 ++ [{action, Action}, {access, Access}, {created_at, CreatedAt}]
end, ets:tab2list(emqx_acl))
end.
import_rules(Rules) ->
lists:foreach(fun(Rule) ->
import_rule(Rule)
end, Rules).
import_resources(Reources) ->
lists:foreach(fun(Resource) ->
import_resource(Resource)
end, Reources).
import_rule(#{<<"id">> := RuleId,
<<"rawsql">> := RawSQL,
<<"actions">> := Actions,
<<"enabled">> := Enabled,
<<"description">> := Desc}) ->
Rule = #{id => RuleId,
rawsql => RawSQL,
actions => map_to_actions(Actions),
enabled => Enabled,
description => Desc},
try emqx_rule_engine:create_rule(Rule)
catch throw:{resource_not_initialized, _ResId} ->
emqx_rule_engine:create_rule(Rule#{enabled => false})
end.
import_resource(#{<<"id">> := Id,
<<"type">> := Type,
<<"config">> := Config,
<<"created_at">> := CreatedAt,
<<"description">> := Desc}) ->
NCreatedAt = case CreatedAt of
null -> undefined;
_ -> CreatedAt
end,
emqx_rule_engine:create_resource(#{id => Id,
type => any_to_atom(Type),
config => Config,
created_at => NCreatedAt,
description => Desc}).
import_resources_and_rules(Resources, Rules, FromVersion)
when FromVersion =:= "4.0" orelse FromVersion =:= "4.1" orelse FromVersion =:= "4.2" ->
Configs = lists:foldl(fun(#{<<"id">> := ID,
<<"type">> := <<"web_hook">>,
<<"config">> := #{<<"content_type">> := ContentType,
<<"headers">> := Headers,
<<"method">> := Method,
<<"url">> := URL}} = Resource, Acc) ->
NConfig = #{<<"connect_timeout">> => 5,
<<"request_timeout">> => 5,
<<"cacertfile">> => <<>>,
<<"certfile">> => <<>>,
<<"keyfile">> => <<>>,
<<"pool_size">> => 8,
<<"url">> => URL,
<<"verify">> => true},
NResource = Resource#{<<"config">> := NConfig},
import_resource(NResource),
NHeaders = maps:put(<<"content-type">>, ContentType, Headers),
[{ID, #{headers => NHeaders, method => Method}} | Acc];
(Resource, Acc) ->
import_resource(Resource),
Acc
end, [], Resources),
lists:foreach(fun(#{<<"actions">> := Actions} = Rule) ->
NActions = apply_new_config(Actions, Configs),
import_rule(Rule#{<<"actions">> := NActions})
end, Rules);
import_resources_and_rules(Resources, Rules, _FromVersion) ->
import_resources(Resources),
import_rules(Rules).
apply_new_config(Actions, Configs) ->
apply_new_config(Actions, Configs, []).
apply_new_config([], _Configs, Acc) ->
Acc;
apply_new_config([Action = #{<<"name">> := <<"data_to_webserver">>,
<<"args">> := #{<<"$resource">> := ID,
<<"path">> := Path,
<<"payload_tmpl">> := PayloadTmpl}} | More], Configs, Acc) ->
case proplists:get_value(ID, Configs, undefined) of
undefined ->
apply_new_config(More, Configs, [Action | Acc]);
#{headers := Headers, method := Method} ->
Args = #{<<"$resource">> => ID,
<<"body">> => PayloadTmpl,
<<"headers">> => Headers,
<<"method">> => Method,
<<"path">> => Path},
apply_new_config(More, Configs, [Action#{<<"args">> := Args} | Acc])
end.
import_blacklist(Blacklist) ->
lists:foreach(fun(#{<<"who">> := Who,
<<"by">> := By,
<<"reason">> := Reason,
<<"at">> := At,
<<"until">> := Until}) ->
NWho = case Who of
#{<<"peerhost">> := Peerhost} ->
{ok, NPeerhost} = inet:parse_address(Peerhost),
{peerhost, NPeerhost};
#{<<"clientid">> := ClientId} -> {clientid, ClientId};
#{<<"username">> := Username} -> {username, Username}
end,
emqx_banned:create(#banned{who = NWho, by = By, reason = Reason, at = At, until = Until})
end, Blacklist).
import_applications(Apps) ->
lists:foreach(fun(#{<<"id">> := AppID,
<<"secret">> := AppSecret,
<<"name">> := Name,
<<"desc">> := Desc,
<<"status">> := Status,
<<"expired">> := Expired}) ->
NExpired = case is_integer(Expired) of
true -> Expired;
false -> undefined
end,
emqx_mgmt_auth:force_add_app(AppID, Name, AppSecret, Desc, Status, NExpired)
end, Apps).
import_users(Users) ->
lists:foreach(fun(#{<<"username">> := Username,
<<"password">> := Password,
<<"tags">> := Tags}) ->
NPassword = base64:decode(Password),
emqx_dashboard_admin:force_add_user(Username, NPassword, Tags)
end, Users).
import_auth_clientid(Lists) ->
case ets:info(emqx_user) of
undefined -> ok;
_ ->
[ mnesia:dirty_write({emqx_user, {clientid, Clientid}, base64:decode(Password), erlang:system_time(millisecond)})
|| #{<<"clientid">> := Clientid, <<"password">> := Password} <- Lists ]
end.
import_auth_username(Lists) ->
case ets:info(emqx_user) of
undefined -> ok;
_ ->
[ mnesia:dirty_write({emqx_user, {username, Username}, base64:decode(Password), erlang:system_time(millisecond)})
|| #{<<"username">> := Username, <<"password">> := Password} <- Lists ]
end.
import_auth_mnesia(Auths, FromVersion) when FromVersion =:= "4.0" orelse
FromVersion =:= "4.1" ->
case ets:info(emqx_user) of
undefined -> ok;
_ ->
CreatedAt = erlang:system_time(millisecond),
[ begin
mnesia:dirty_write({emqx_user, {username, Login}, base64:decode(Password), CreatedAt})
end
|| #{<<"login">> := Login,
<<"password">> := Password} <- Auths ]
end;
import_auth_mnesia(Auths, _) ->
case ets:info(emqx_user) of
undefined -> ok;
_ ->
[ mnesia:dirty_write({emqx_user, {any_to_atom(Type), Login}, base64:decode(Password), CreatedAt})
|| #{<<"login">> := Login,
<<"type">> := Type,
<<"password">> := Password,
<<"created_at">> := CreatedAt } <- Auths ]
end.
import_acl_mnesia(Acls, FromVersion) when FromVersion =:= "4.0" orelse
FromVersion =:= "4.1" ->
case ets:info(emqx_acl) of
undefined -> ok;
_ ->
CreatedAt = erlang:system_time(millisecond),
[begin
Allow1 = case any_to_atom(Allow) of
true -> allow;
false -> deny
end,
mnesia:dirty_write({emqx_acl, {{username, Login}, Topic}, any_to_atom(Action), Allow1, CreatedAt})
end || #{<<"login">> := Login,
<<"topic">> := Topic,
<<"allow">> := Allow,
<<"action">> := Action} <- Acls]
end;
import_acl_mnesia(Acls, _) ->
case ets:info(emqx_acl) of
undefined -> ok;
_ ->
[ begin
Filter = case maps:get(<<"type_value">>, Map, undefined) of
undefined ->
{any_to_atom(maps:get(<<"type">>, Map)), maps:get(<<"topic">>, Map)};
Value ->
{{any_to_atom(maps:get(<<"type">>, Map)), Value}, maps:get(<<"topic">>, Map)}
end,
mnesia:dirty_write({emqx_acl ,Filter, any_to_atom(Action), any_to_atom(Access), CreatedAt})
end
|| Map = #{<<"action">> := Action,
<<"access">> := Access,
<<"created_at">> := CreatedAt} <- Acls ]
end.
any_to_atom(L) when is_list(L) -> list_to_atom(L);
any_to_atom(B) when is_binary(B) -> binary_to_atom(B, utf8);
any_to_atom(A) when is_atom(A) -> A.
to_version(Version) when is_integer(Version) ->
integer_to_list(Version);
to_version(Version) when is_binary(Version) ->
binary_to_list(Version);
to_version(Version) when is_list(Version) ->
Version.
%%--------------------------------------------------------------------
%% Telemtry API
@ -968,21 +551,4 @@ max_row_limit() ->
table_size(Tab) -> ets:info(Tab, size).
map_to_actions(Maps) ->
[map_to_action(M) || M <- Maps].
map_to_action(Map = #{<<"id">> := ActionInstId, <<"name">> := Name, <<"args">> := Args}) ->
#{id => ActionInstId,
name => any_to_atom(Name),
args => Args,
fallbacks => map_to_actions(maps:get(<<"fallbacks">>, Map, []))}.
actions_to_prop_list(Actions) ->
[action_to_prop_list(Act) || Act <- Actions].
action_to_prop_list({action_instance, ActionInstId, Name, FallbackActions, Args}) ->
[{id, ActionInstId},
{name, Name},
{fallbacks, actions_to_prop_list(FallbackActions)},
{args, Args}].

View File

@ -75,46 +75,10 @@
]).
export(_Bindings, _Params) ->
Rules = emqx_mgmt:export_rules(),
Resources = emqx_mgmt:export_resources(),
Blacklist = emqx_mgmt:export_blacklist(),
Apps = emqx_mgmt:export_applications(),
Users = emqx_mgmt:export_users(),
AuthMnesia = emqx_mgmt:export_auth_mnesia(),
AclMnesia = emqx_mgmt:export_acl_mnesia(),
Seconds = erlang:system_time(second),
{{Y, M, D}, {H, MM, S}} = emqx_mgmt_util:datetime(Seconds),
Filename = io_lib:format("emqx-export-~p-~p-~p-~p-~p-~p.json", [Y, M, D, H, MM, S]),
NFilename = filename:join([emqx:get_env(data_dir), Filename]),
Version = string:sub_string(emqx_sys:version(), 1, 3),
Data = [{version, erlang:list_to_binary(Version)},
{date, erlang:list_to_binary(emqx_mgmt_util:strftime(Seconds))},
{rules, Rules},
{resources, Resources},
{blacklist, Blacklist},
{apps, Apps},
{users, Users},
{auth_mnesia, AuthMnesia},
{acl_mnesia, AclMnesia}
],
Bin = emqx_json:encode(Data),
ok = filelib:ensure_dir(NFilename),
case file:write_file(NFilename, Bin) of
ok ->
case file:read_file_info(NFilename) of
{ok, #file_info{size = Size, ctime = {{Y0, M0, D0}, {H0, MM0, S0}}}} ->
CreatedAt = io_lib:format("~p-~p-~p ~p:~p:~p", [Y0, M0, D0, H0, MM0, S0]),
return({ok, [{filename, list_to_binary(Filename)},
{size, Size},
{created_at, list_to_binary(CreatedAt)},
{node, node()}
]});
{error, Reason} ->
return({error, Reason})
end;
{error, Reason} ->
return({error, Reason})
case emqx_mgmt_data_backup:export() of
{ok, File = #{filename := Filename}} ->
return({ok, File#{filename => filename:basename(Filename)}});
Return -> return(Return)
end.
list_exported(_Bindings, _Params) ->
@ -158,7 +122,7 @@ import(_Bindings, Params) ->
case lists:member(Node,
[ erlang:atom_to_binary(N, utf8) || N <- ekka_mnesia:running_nodes() ]
) of
true -> rpc:call(erlang:binary_to_atom(Node, utf8), ?MODULE, do_import, [Filename]);
true -> return(rpc:call(erlang:binary_to_atom(Node, utf8), ?MODULE, do_import, [Filename]));
false -> return({error, no_existent_node})
end
end,
@ -167,34 +131,7 @@ import(_Bindings, Params) ->
do_import(Filename) ->
FullFilename = filename:join([emqx:get_env(data_dir), Filename]),
case file:read_file(FullFilename) of
{ok, Json} ->
Data = emqx_json:decode(Json, [return_maps]),
Version = emqx_mgmt:to_version(maps:get(<<"version">>, Data)),
case lists:member(Version, ?VERSIONS) of
true ->
try
emqx_mgmt:import_resources_and_rules(maps:get(<<"resources">>, Data, []), maps:get(<<"rules">>, Data, []), Version),
emqx_mgmt:import_blacklist(maps:get(<<"blacklist">>, Data, [])),
emqx_mgmt:import_applications(maps:get(<<"apps">>, Data, [])),
emqx_mgmt:import_users(maps:get(<<"users">>, Data, [])),
_ = emqx_mgmt:import_auth_clientid(maps:get(<<"auth_clientid">>, Data, [])),
_ = emqx_mgmt:import_auth_username(maps:get(<<"auth_username">>, Data, [])),
_ = emqx_mgmt:import_auth_mnesia(maps:get(<<"auth_mnesia">>, Data, []), Version),
_ = emqx_mgmt:import_acl_mnesia(maps:get(<<"acl_mnesia">>, Data, []), Version),
logger:debug("The emqx data has been imported successfully"),
ok
catch Class:Reason:Stack ->
logger:error("The emqx data import failed: ~0p", [{Class,Reason,Stack}]),
{error, import_failed}
end;
false ->
logger:error("Unsupported version: ~p", [Version]),
{error, unsupported_version}
end;
{error, Reason} ->
{error, Reason}
end.
emqx_mgmt_data_backup:import(FullFilename).
download(#{filename := Filename}, _Params) ->
FullFilename = filename:join([emqx:get_env(data_dir), Filename]),

View File

@ -78,7 +78,19 @@ subscribe(_Bindings, Params) ->
publish(_Bindings, Params) ->
logger:debug("API publish Params:~p", [Params]),
{ClientId, Topic, Qos, Retain, Payload} = parse_publish_params(Params),
return(do_publish(ClientId, Topic, Qos, Retain, Payload)).
case do_publish(ClientId, Topic, Qos, Retain, Payload) of
{ok, MsgIds} ->
case get_value(<<"return">>, Params, undefined) of
undefined -> return(ok);
_Val ->
case get_value(<<"topics">>, Params, undefined) of
undefined -> return({ok, #{msgid => lists:last(MsgIds)}});
_ -> return({ok, #{msgids => MsgIds}})
end
end;
Result ->
return(Result)
end.
unsubscribe(_Bindings, Params) ->
logger:debug("API unsubscribe Params:~p", [Params]),
@ -119,7 +131,7 @@ loop_publish([], Result) ->
loop_publish([Params | ParamsN], Acc) ->
{ClientId, Topic, Qos, Retain, Payload} = parse_publish_params(Params),
Code = case do_publish(ClientId, Topic, Qos, Retain, Payload) of
ok -> 0;
{ok, _} -> 0;
{_, Code0, _} -> Code0
end,
Result = #{topic => resp_topic(get_value(<<"topic">>, Params), get_value(<<"topics">>, Params, <<"">>)),
@ -153,11 +165,12 @@ do_subscribe(ClientId, Topics, QoS) ->
do_publish(_ClientId, [], _Qos, _Retain, _Payload) ->
{ok, ?ERROR15, bad_topic};
do_publish(ClientId, Topics, Qos, Retain, Payload) ->
lists:foreach(fun(Topic) ->
MsgIds = lists:map(fun(Topic) ->
Msg = emqx_message:make(ClientId, Qos, Topic, Payload),
emqx_mgmt:publish(Msg#message{flags = #{retain => Retain}})
_ = emqx_mgmt:publish(Msg#message{flags = #{retain => Retain}}),
emqx_guid:to_hexstr(Msg#message.id)
end, Topics),
ok.
{ok, MsgIds}.
do_unsubscribe(ClientId, Topic) ->
case validate_by_filter(Topic) of

View File

@ -23,8 +23,6 @@
-define(PRINT_CMD(Cmd, Descr), io:format("~-48s# ~s~n", [Cmd, Descr])).
-import(lists, [foreach/2]).
-export([load/0]).
-export([ status/1
@ -41,7 +39,6 @@
, log/1
, mgmt/1
, data/1
, modules/1
]).
-define(PROC_INFOKEYS, [status,
@ -280,7 +277,7 @@ if_valid_qos(QoS, Fun) ->
end.
plugins(["list"]) ->
foreach(fun print/1, emqx_plugins:list());
lists:foreach(fun print/1, emqx_plugins:list());
plugins(["load", Name]) ->
case emqx_plugins:load(list_to_atom(Name)) of
@ -322,44 +319,6 @@ plugins(_) ->
{"plugins reload <Plugin>", "Reload plugin"}
]).
%%--------------------------------------------------------------------
%% @doc Modules Command
modules(["list"]) ->
foreach(fun(Module) -> print({module, Module}) end, emqx_modules:list());
modules(["load", Name]) ->
case emqx_modules:load(list_to_atom(Name)) of
ok ->
emqx_ctl:print("Module ~s loaded successfully.~n", [Name]);
{error, Reason} ->
emqx_ctl:print("Load module ~s error: ~p.~n", [Name, Reason])
end;
modules(["unload", Name]) ->
case emqx_modules:unload(list_to_atom(Name)) of
ok ->
emqx_ctl:print("Module ~s unloaded successfully.~n", [Name]);
{error, Reason} ->
emqx_ctl:print("Unload module ~s error: ~p.~n", [Name, Reason])
end;
modules(["reload", "emqx_mod_acl_internal" = Name]) ->
case emqx_modules:reload(list_to_atom(Name)) of
ok ->
emqx_ctl:print("Module ~s reloaded successfully.~n", [Name]);
{error, Reason} ->
emqx_ctl:print("Reload module ~s error: ~p.~n", [Name, Reason])
end;
modules(["reload", Name]) ->
emqx_ctl:print("Module: ~p does not need to be reloaded.~n", [Name]);
modules(_) ->
emqx_ctl:usage([{"modules list", "Show loaded modules"},
{"modules load <Module>", "Load module"},
{"modules unload <Module>", "Unload module"},
{"modules reload <Module>", "Reload module"}
]).
%%--------------------------------------------------------------------
%% @doc vm command
@ -460,7 +419,7 @@ log(_) ->
%% @doc Trace Command
trace(["list"]) ->
foreach(fun({{Who, Name}, {Level, LogFile}}) ->
lists:foreach(fun({{Who, Name}, {Level, LogFile}}) ->
emqx_ctl:print("Trace(~s=~s, level=~s, destination=~p)~n", [Who, Name, Level, LogFile])
end, emqx_tracer:lookup_traces());
@ -509,7 +468,7 @@ trace_off(Who, Name) ->
%% @doc Listeners Command
listeners([]) ->
foreach(fun({{Protocol, ListenOn}, _Pid}) ->
lists:foreach(fun({{Protocol, ListenOn}, _Pid}) ->
Info = [{listen_on, {string, emqx_listeners:format_listen_on(ListenOn)}},
{acceptors, esockd:get_acceptors({Protocol, ListenOn})},
{max_conns, esockd:get_max_connections({Protocol, ListenOn})},
@ -517,9 +476,9 @@ listeners([]) ->
{shutdown_count, esockd:get_shutdown_count({Protocol, ListenOn})}
],
emqx_ctl:print("~s~n", [listener_identifier(Protocol, ListenOn)]),
foreach(fun indent_print/1, Info)
lists:foreach(fun indent_print/1, Info)
end, esockd:listeners()),
foreach(fun({Protocol, Opts}) ->
lists:foreach(fun({Protocol, Opts}) ->
Port = proplists:get_value(port, Opts),
Info = [{listen_on, {string, emqx_listeners:format_listen_on(Port)}},
{acceptors, maps:get(num_acceptors, proplists:get_value(transport_options, Opts, #{}), 0)},
@ -527,7 +486,7 @@ listeners([]) ->
{current_conn, proplists:get_value(all_connections, Opts)},
{shutdown_count, []}],
emqx_ctl:print("~s~n", [listener_identifier(Protocol, Port)]),
foreach(fun indent_print/1, Info)
lists:foreach(fun indent_print/1, Info)
end, ranch:info());
listeners(["stop", Name = "http" ++ _N | _MaybePort]) ->
@ -582,59 +541,21 @@ stop_listener(#{listen_on := ListenOn} = Listener, _Input) ->
%% @doc data Command
data(["export"]) ->
Rules = emqx_mgmt:export_rules(),
Resources = emqx_mgmt:export_resources(),
Blacklist = emqx_mgmt:export_blacklist(),
Apps = emqx_mgmt:export_applications(),
Users = emqx_mgmt:export_users(),
AuthMnesia = emqx_mgmt:export_auth_mnesia(),
AclMnesia = emqx_mgmt:export_acl_mnesia(),
Seconds = erlang:system_time(second),
{{Y, M, D}, {H, MM, S}} = emqx_mgmt_util:datetime(Seconds),
Filename = io_lib:format("emqx-export-~p-~p-~p-~p-~p-~p.json", [Y, M, D, H, MM, S]),
NFilename = filename:join([emqx:get_env(data_dir), Filename]),
Version = string:sub_string(emqx_sys:version(), 1, 3),
Data = [{version, erlang:list_to_binary(Version)},
{date, erlang:list_to_binary(emqx_mgmt_util:strftime(Seconds))},
{rules, Rules},
{resources, Resources},
{blacklist, Blacklist},
{apps, Apps},
{users, Users},
{auth_mnesia, AuthMnesia},
{acl_mnesia, AclMnesia}],
ok = filelib:ensure_dir(NFilename),
case file:write_file(NFilename, emqx_json:encode(Data)) of
ok ->
emqx_ctl:print("The emqx data has been successfully exported to ~s.~n", [NFilename]);
case emqx_mgmt_data_backup:export() of
{ok, #{filename := Filename}} ->
emqx_ctl:print("The emqx data has been successfully exported to ~s.~n", [Filename]);
{error, Reason} ->
emqx_ctl:print("The emqx data export failed due to ~p.~n", [Reason])
end;
data(["import", Filename]) ->
case file:read_file(Filename) of
{ok, Json} ->
Data = emqx_json:decode(Json, [return_maps]),
Version = emqx_mgmt:to_version(maps:get(<<"version">>, Data)),
case lists:member(Version, ?VERSIONS) of
true ->
try
emqx_mgmt:import_resources(maps:get(<<"resources">>, Data, [])),
emqx_mgmt:import_rules(maps:get(<<"rules">>, Data, [])),
emqx_mgmt:import_blacklist(maps:get(<<"blacklist">>, Data, [])),
emqx_mgmt:import_applications(maps:get(<<"apps">>, Data, [])),
emqx_mgmt:import_users(maps:get(<<"users">>, Data, [])),
_ = emqx_mgmt:import_auth_clientid(maps:get(<<"auth_clientid">>, Data, [])),
_ = emqx_mgmt:import_auth_username(maps:get(<<"auth_username">>, Data, [])),
_ = emqx_mgmt:import_auth_mnesia(maps:get(<<"auth_mnesia">>, Data, []), Version),
_ = emqx_mgmt:import_acl_mnesia(maps:get(<<"acl_mnesia">>, Data, []), Version),
emqx_ctl:print("The emqx data has been imported successfully.~n")
catch Class:Reason:Stack ->
emqx_ctl:print("The emqx data import failed due: ~0p~n", [{Class,Reason,Stack}])
end;
false ->
emqx_ctl:print("Unsupported version: ~p~n", [Version])
end;
case emqx_mgmt_data_backup:import(Filename) of
ok ->
emqx_ctl:print("The emqx data has been imported successfully.~n");
{error, import_failed} ->
emqx_ctl:print("The emqx data import failed.~n");
{error, unsupported_version} ->
emqx_ctl:print("The emqx data import failed: Unsupported version.~n");
{error, Reason} ->
emqx_ctl:print("The emqx data import failed: ~0p while reading ~s.~n", [Reason, Filename])
end;
@ -713,10 +634,6 @@ print(#plugin{name = Name, descr = Descr, active = Active}) ->
emqx_ctl:print("Plugin(~s, description=~s, active=~s)~n",
[Name, Descr, Active]);
print({module, {Name, Active}}) ->
emqx_ctl:print("Module(~s, description=~s, active=~s)~n",
[Name, Name:description(), Active]);
print({emqx_suboption, {{Pid, Topic}, Options}}) when is_pid(Pid) ->
emqx_ctl:print("~s -> ~s~n", [maps:get(subid, Options), Topic]).

View File

@ -0,0 +1,570 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_data_backup).
-include("emqx_mgmt.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("kernel/include/file.hrl").
-ifdef(EMQX_ENTERPRISE).
-export([ export_modules/0
, export_schemas/0
, export_confs/0
, import_modules/1
, import_schemas/1
, import_confs/2
]).
-endif.
-export([ export_rules/0
, export_resources/0
, export_blacklist/0
, export_applications/0
, export_users/0
, export_auth_mnesia/0
, export_acl_mnesia/0
, import_resources_and_rules/3
, import_rules/1
, import_resources/1
, import_blacklist/1
, import_applications/1
, import_users/1
, import_auth_clientid/1 %% BACKW: 4.1.x
, import_auth_username/1 %% BACKW: 4.1.x
, import_auth_mnesia/2
, import_acl_mnesia/2
, to_version/1
]).
-export([ export/0
, import/1
]).
%%--------------------------------------------------------------------
%% Data Export and Import
%%--------------------------------------------------------------------
export_rules() ->
lists:map(fun({_, RuleId, _, RawSQL, _, _, _, _, _, _, Actions, Enabled, Desc}) ->
[{id, RuleId},
{rawsql, RawSQL},
{actions, actions_to_prop_list(Actions)},
{enabled, Enabled},
{description, Desc}]
end, emqx_rule_registry:get_rules()).
export_resources() ->
lists:map(fun({_, Id, Type, Config, CreatedAt, Desc}) ->
NCreatedAt = case CreatedAt of
undefined -> null;
_ -> CreatedAt
end,
[{id, Id},
{type, Type},
{config, maps:to_list(Config)},
{created_at, NCreatedAt},
{description, Desc}]
end, emqx_rule_registry:get_resources()).
export_blacklist() ->
lists:map(fun(#banned{who = Who, by = By, reason = Reason, at = At, until = Until}) ->
NWho = case Who of
{peerhost, Peerhost} -> {peerhost, inet:ntoa(Peerhost)};
_ -> Who
end,
[{who, [NWho]}, {by, By}, {reason, Reason}, {at, At}, {until, Until}]
end, ets:tab2list(emqx_banned)).
export_applications() ->
lists:map(fun({_, AppID, AppSecret, Name, Desc, Status, Expired}) ->
[{id, AppID}, {secret, AppSecret}, {name, Name}, {desc, Desc}, {status, Status}, {expired, Expired}]
end, ets:tab2list(mqtt_app)).
export_users() ->
lists:map(fun({_, Username, Password, Tags}) ->
[{username, Username}, {password, base64:encode(Password)}, {tags, Tags}]
end, ets:tab2list(mqtt_admin)).
export_auth_mnesia() ->
case ets:info(emqx_user) of
undefined -> [];
_ ->
lists:map(fun({_, {Type, Login}, Password, CreatedAt}) ->
[{login, Login}, {type, Type}, {password, base64:encode(Password)}, {created_at, CreatedAt}]
end, ets:tab2list(emqx_user))
end.
export_acl_mnesia() ->
case ets:info(emqx_acl) of
undefined -> [];
_ ->
lists:map(fun({_, Filter, Action, Access, CreatedAt}) ->
Filter1 = case Filter of
{{Type, TypeValue}, Topic} ->
[{type, Type}, {type_value, TypeValue}, {topic, Topic}];
{Type, Topic} ->
[{type, Type}, {topic, Topic}]
end,
Filter1 ++ [{action, Action}, {access, Access}, {created_at, CreatedAt}]
end, ets:tab2list(emqx_acl))
end.
-ifdef(EMQX_ENTERPRISE).
export_modules() ->
case ets:info(emqx_modules) of
undefined -> [];
_ ->
lists:map(fun({_, Id, Type, Config, Enabled, CreatedAt, Description}) ->
[{id, Id},
{type, Type},
{config, Config},
{enabled, Enabled},
{created_at, CreatedAt},
{description, Description}
]
end, ets:tab2list(emqx_modules))
end.
export_schemas() ->
case ets:info(emqx_schema) of
undefined -> [];
_ ->
[emqx_schema_api:format_schema(Schema) || Schema <- emqx_schema_registry:get_all_schemas()]
end.
export_confs() ->
case ets:info(emqx_conf_info) of
undefined -> {[], []};
_ ->
{lists:map(fun({_, Key, Confs}) ->
case Key of
{_Zone, Name} ->
[{zone, list_to_binary(Name)},
{confs, confs_to_binary(Confs)}];
{_Listener, Type, Name} ->
[{type, list_to_binary(Type)},
{name, list_to_binary(Name)},
{confs, confs_to_binary(Confs)}];
Name ->
[{name, list_to_binary(Name)},
{confs, confs_to_binary(Confs)}]
end
end, ets:tab2list(emqx_conf_b)),
lists:map(fun({_, {_Listener, Type, Name}, Status}) ->
[{type, list_to_binary(Type)},
{name, list_to_binary(Name)},
{status, Status}]
end, ets:tab2list(emqx_listeners_state))}
end.
confs_to_binary(Confs) ->
[{list_to_binary(Key), list_to_binary(Val)} || {Key, Val} <-Confs].
-else.
import_rule(#{<<"id">> := RuleId,
<<"rawsql">> := RawSQL,
<<"actions">> := Actions,
<<"enabled">> := Enabled,
<<"description">> := Desc}) ->
Rule = #{id => RuleId,
rawsql => RawSQL,
actions => map_to_actions(Actions),
enabled => Enabled,
description => Desc},
try emqx_rule_engine:create_rule(Rule)
catch throw:{resource_not_initialized, _ResId} ->
emqx_rule_engine:create_rule(Rule#{enabled => false})
end.
map_to_actions(Maps) ->
[map_to_action(M) || M <- Maps].
map_to_action(Map = #{<<"id">> := ActionInstId, <<"name">> := Name, <<"args">> := Args}) ->
#{id => ActionInstId,
name => any_to_atom(Name),
args => Args,
fallbacks => map_to_actions(maps:get(<<"fallbacks">>, Map, []))}.
-endif.
import_rules(Rules) ->
lists:foreach(fun(Resource) ->
import_resource(Resource)
end, Rules).
import_resources(Reources) ->
lists:foreach(fun(Resource) ->
import_resource(Resource)
end, Reources).
import_resource(#{<<"id">> := Id,
<<"type">> := Type,
<<"config">> := Config,
<<"created_at">> := CreatedAt,
<<"description">> := Desc}) ->
NCreatedAt = case CreatedAt of
null -> undefined;
_ -> CreatedAt
end,
emqx_rule_engine:create_resource(#{id => Id,
type => any_to_atom(Type),
config => Config,
created_at => NCreatedAt,
description => Desc}).
-ifdef(EMQX_ENTERPRISE).
import_resources_and_rules(Resources, Rules, _FromVersion) ->
import_resources(Resources),
import_rules(Rules).
-else.
import_resources_and_rules(Resources, Rules, FromVersion)
when FromVersion =:= "4.0" orelse FromVersion =:= "4.1" orelse FromVersion =:= "4.2" ->
Configs = lists:foldl(fun(#{<<"id">> := ID,
<<"type">> := <<"web_hook">>,
<<"config">> := #{<<"content_type">> := ContentType,
<<"headers">> := Headers,
<<"method">> := Method,
<<"url">> := URL}} = Resource, Acc) ->
NConfig = #{<<"connect_timeout">> => 5,
<<"request_timeout">> => 5,
<<"cacertfile">> => <<>>,
<<"certfile">> => <<>>,
<<"keyfile">> => <<>>,
<<"pool_size">> => 8,
<<"url">> => URL,
<<"verify">> => true},
NResource = Resource#{<<"config">> := NConfig},
{ok, _Resource} = import_resource(NResource),
NHeaders = maps:put(<<"content-type">>, ContentType, Headers),
[{ID, #{headers => NHeaders, method => Method}} | Acc];
(Resource, Acc) ->
{ok, _Resource} = import_resource(Resource),
Acc
end, [], Resources),
lists:foreach(fun(#{<<"actions">> := Actions} = Rule) ->
NActions = apply_new_config(Actions, Configs),
import_rule(Rule#{<<"actions">> := NActions})
end, Rules);
import_resources_and_rules(Resources, Rules, _FromVersion) ->
import_resources(Resources),
import_rules(Rules).
apply_new_config(Actions, Configs) ->
apply_new_config(Actions, Configs, []).
apply_new_config([], _Configs, Acc) ->
Acc;
apply_new_config([Action = #{<<"name">> := <<"data_to_webserver">>,
<<"args">> := #{<<"$resource">> := ID,
<<"path">> := Path,
<<"payload_tmpl">> := PayloadTmpl}} | More], Configs, Acc) ->
case proplists:get_value(ID, Configs, undefined) of
undefined ->
apply_new_config(More, Configs, [Action | Acc]);
#{headers := Headers, method := Method} ->
Args = #{<<"$resource">> => ID,
<<"body">> => PayloadTmpl,
<<"headers">> => Headers,
<<"method">> => Method,
<<"path">> => Path},
apply_new_config(More, Configs, [Action#{<<"args">> := Args} | Acc])
end.
-endif.
actions_to_prop_list(Actions) ->
[action_to_prop_list(Act) || Act <- Actions].
action_to_prop_list({action_instance, ActionInstId, Name, FallbackActions, Args}) ->
[{id, ActionInstId},
{name, Name},
{fallbacks, actions_to_prop_list(FallbackActions)},
{args, Args}].
import_blacklist(Blacklist) ->
lists:foreach(fun(#{<<"who">> := Who,
<<"by">> := By,
<<"reason">> := Reason,
<<"at">> := At,
<<"until">> := Until}) ->
NWho = case Who of
#{<<"peerhost">> := Peerhost} ->
{ok, NPeerhost} = inet:parse_address(Peerhost),
{peerhost, NPeerhost};
#{<<"clientid">> := ClientId} -> {clientid, ClientId};
#{<<"username">> := Username} -> {username, Username}
end,
emqx_banned:create(#banned{who = NWho, by = By, reason = Reason, at = At, until = Until})
end, Blacklist).
import_applications(Apps) ->
lists:foreach(fun(#{<<"id">> := AppID,
<<"secret">> := AppSecret,
<<"name">> := Name,
<<"desc">> := Desc,
<<"status">> := Status,
<<"expired">> := Expired}) ->
NExpired = case is_integer(Expired) of
true -> Expired;
false -> undefined
end,
emqx_mgmt_auth:force_add_app(AppID, Name, AppSecret, Desc, Status, NExpired)
end, Apps).
import_users(Users) ->
lists:foreach(fun(#{<<"username">> := Username,
<<"password">> := Password,
<<"tags">> := Tags}) ->
NPassword = base64:decode(Password),
emqx_dashboard_admin:force_add_user(Username, NPassword, Tags)
end, Users).
import_auth_clientid(Lists) ->
case ets:info(emqx_user) of
undefined -> ok;
_ ->
lists:foreach(fun(#{<<"clientid">> := Clientid, <<"password">> := Password}) ->
mnesia:dirty_write({emqx_user, {clientid, Clientid}, base64:decode(Password), erlang:system_time(millisecond)})
end, Lists)
end.
import_auth_username(Lists) ->
case ets:info(emqx_user) of
undefined -> ok;
_ ->
lists:foreach(fun(#{<<"username">> := Username, <<"password">> := Password}) ->
mnesia:dirty_write({emqx_user, {username, Username}, base64:decode(Password), erlang:system_time(millisecond)})
end, Lists)
end.
-ifdef(EMQX_ENTERPRISE).
import_auth_mnesia(Auths, FromVersion) when FromVersion =:= "4.0" orelse
FromVersion =:= "4.1" ->
do_import_auth_mnesia_by_old_data(Auths);
import_auth_mnesia(Auths, _) ->
do_import_auth_mnesia(Auths).
import_acl_mnesia(Acls, FromVersion) when FromVersion =:= "4.0" orelse
FromVersion =:= "4.1" ->
do_import_acl_mnesia_by_old_data(Acls);
import_acl_mnesia(Acls, _) ->
do_import_acl_mnesia(Acls).
-else.
import_auth_mnesia(Auths, FromVersion) when FromVersion =:= "4.0" orelse
FromVersion =:= "4.1" orelse
FromVersion =:= "4.2" ->
do_import_auth_mnesia_by_old_data(Auths);
import_auth_mnesia(Auths, _) ->
do_import_auth_mnesia(Auths).
import_acl_mnesia(Acls, FromVersion) when FromVersion =:= "4.0" orelse
FromVersion =:= "4.1" orelse
FromVersion =:= "4.2" ->
do_import_acl_mnesia_by_old_data(Acls);
import_acl_mnesia(Acls, _) ->
do_import_acl_mnesia(Acls).
-endif.
do_import_auth_mnesia_by_old_data(Auths) ->
case ets:info(emqx_user) of
undefined -> ok;
_ ->
CreatedAt = erlang:system_time(millisecond),
lists:foreach(fun(#{<<"login">> := Login,
<<"password">> := Password}) ->
mnesia:dirty_write({emqx_user, {username, Login}, base64:decode(Password), CreatedAt})
end, Auths)
end.
do_import_auth_mnesia(Auths) ->
case ets:info(emqx_user) of
undefined -> ok;
_ ->
lists:foreach(fun(#{<<"login">> := Login,
<<"type">> := Type,
<<"password">> := Password,
<<"created_at">> := CreatedAt }) ->
mnesia:dirty_write({emqx_user, {any_to_atom(Type), Login}, base64:decode(Password), CreatedAt})
end, Auths)
end.
do_import_acl_mnesia_by_old_data(Acls) ->
case ets:info(emqx_acl) of
undefined -> ok;
_ ->
CreatedAt = erlang:system_time(millisecond),
lists:foreach(fun(#{<<"login">> := Login,
<<"topic">> := Topic,
<<"allow">> := Allow,
<<"action">> := Action}) ->
Allow1 = case any_to_atom(Allow) of
true -> allow;
false -> deny
end,
mnesia:dirty_write({emqx_acl, {{username, Login}, Topic}, any_to_atom(Action), Allow1, CreatedAt})
end, Acls)
end.
do_import_acl_mnesia(Acls) ->
case ets:info(emqx_acl) of
undefined -> ok;
_ ->
lists:foreach(fun(Map = #{<<"action">> := Action,
<<"access">> := Access,
<<"created_at">> := CreatedAt}) ->
Filter = case maps:get(<<"type_value">>, Map, undefined) of
undefined ->
{any_to_atom(maps:get(<<"type">>, Map)), maps:get(<<"topic">>, Map)};
Value ->
{{any_to_atom(maps:get(<<"type">>, Map)), Value}, maps:get(<<"topic">>, Map)}
end,
mnesia:dirty_write({emqx_acl ,Filter, any_to_atom(Action), any_to_atom(Access), CreatedAt})
end, Acls)
end.
-ifdef(EMQX_ENTERPRISE).
import_modules(Modules) ->
case ets:info(emqx_modules) of
undefined -> [];
_ ->
lists:foreach(fun(#{<<"id">> := Id,
<<"type">> := Type,
<<"config">> := Config,
<<"enabled">> := Enabled,
<<"created_at">> := CreatedAt,
<<"description">> := Description}) ->
emqx_modules:import_module({Id, any_to_atom(Type), Config, Enabled, CreatedAt, Description})
end, Modules)
end.
import_schemas(Schemas) ->
case ets:info(emqx_schema) of
undefined -> ok;
_ -> [emqx_schema_registry:add_schema(emqx_schema_api:make_schema_params(Schema)) || Schema <- Schemas]
end.
import_confs(Configs, ListenersState) ->
case ets:info(emqx_conf_info) of
undefined -> ok;
_ ->
emqx_conf:import_confs(Configs, ListenersState)
end.
-endif.
any_to_atom(L) when is_list(L) -> list_to_atom(L);
any_to_atom(B) when is_binary(B) -> binary_to_atom(B, utf8);
any_to_atom(A) when is_atom(A) -> A.
to_version(Version) when is_integer(Version) ->
integer_to_list(Version);
to_version(Version) when is_binary(Version) ->
binary_to_list(Version);
to_version(Version) when is_list(Version) ->
Version.
export() ->
Seconds = erlang:system_time(second),
Data = do_export_data() ++ [{date, erlang:list_to_binary(emqx_mgmt_util:strftime(Seconds))}],
{{Y, M, D}, {H, MM, S}} = emqx_mgmt_util:datetime(Seconds),
Filename = io_lib:format("emqx-export-~p-~p-~p-~p-~p-~p.json", [Y, M, D, H, MM, S]),
NFilename = filename:join([emqx:get_env(data_dir), Filename]),
ok = filelib:ensure_dir(NFilename),
case file:write_file(NFilename, emqx_json:encode(Data)) of
ok ->
case file:read_file_info(NFilename) of
{ok, #file_info{size = Size, ctime = {{Y, M, D}, {H, MM, S}}}} ->
CreatedAt = io_lib:format("~p-~p-~p ~p:~p:~p", [Y, M, D, H, MM, S]),
{ok, #{filename => list_to_binary(NFilename),
size => Size,
created_at => list_to_binary(CreatedAt),
node => node()
}};
Error -> Error
end;
Error -> Error
end.
do_export_data() ->
Version = string:sub_string(emqx_sys:version(), 1, 3),
[{version, erlang:list_to_binary(Version)},
{rules, export_rules()},
{resources, export_resources()},
{blacklist, export_blacklist()},
{apps, export_applications()},
{users, export_users()},
{auth_mnesia, export_auth_mnesia()},
{acl_mnesia, export_acl_mnesia()}
] ++ do_export_extra_data().
-ifdef(EMQX_ENTERPRISE).
do_export_extra_data() ->
{Configs, State} = export_confs(),
[{modules, export_modules()},
{schemas, export_schemas()},
{configs, Configs},
{listeners_state, State}
].
-else.
do_export_extra_data() -> [].
-endif.
import(Filename) ->
case file:read_file(Filename) of
{ok, Json} ->
Data = emqx_json:decode(Json, [return_maps]),
Version = to_version(maps:get(<<"version">>, Data)),
case lists:member(Version, ?VERSIONS) of
true ->
try
do_import_data(Data, Version),
logger:debug("The emqx data has been imported successfully"),
ok
catch Class:Reason:Stack ->
logger:error("The emqx data import failed: ~0p", [{Class,Reason,Stack}]),
{error, import_failed}
end;
false ->
logger:error("Unsupported version: ~p", [Version]),
{error, unsupported_version}
end;
Error -> Error
end.
do_import_data(Data, Version) ->
do_import_extra_data(Data, Version),
import_resources_and_rules(maps:get(<<"resources">>, Data, []), maps:get(<<"rules">>, Data, []), Version),
import_blacklist(maps:get(<<"blacklist">>, Data, [])),
import_applications(maps:get(<<"apps">>, Data, [])),
import_users(maps:get(<<"users">>, Data, [])),
import_auth_clientid(maps:get(<<"auth_clientid">>, Data, [])),
import_auth_username(maps:get(<<"auth_username">>, Data, [])),
import_auth_mnesia(maps:get(<<"auth_mnesia">>, Data, []), Version),
import_acl_mnesia(maps:get(<<"acl_mnesia">>, Data, []), Version).
-ifdef(EMQX_ENTERPRISE).
do_import_extra_data(Data, _Version) ->
import_confs(maps:get(<<"configs">>, Data, []), maps:get(<<"listeners_state">>, Data, [])),
import_modules(maps:get(<<"modules">>, Data, [])),
import_schemas(maps:get(<<"schemas">>, Data, [])),
ok.
-else.
do_import_extra_data(_Data, _Version) -> ok.
-endif.

View File

@ -85,7 +85,7 @@ listener_name(Proto) ->
http_handlers() ->
Plugins = lists:map(fun(Plugin) -> Plugin#plugin.name end, emqx_plugins:list()),
[{"/api/v4", minirest:handler(#{apps => Plugins -- ?EXCEPT_PLUGIN,
[{"/api/v4", minirest:handler(#{apps => Plugins ++ [emqx_modules] -- ?EXCEPT_PLUGIN,
except => ?EXCEPT,
filter => fun filter/1}),
[{authorization, fun authorize_appid/1}]}].
@ -119,6 +119,7 @@ authorize_appid(Req) ->
_ -> false
end.
filter(#{app := emqx_modules}) -> true;
filter(#{app := App}) ->
case emqx_plugins:find_plugin(App) of
false -> false;

View File

@ -44,7 +44,6 @@ groups() ->
t_clients_cmd,
t_vm_cmd,
t_plugins_cmd,
t_modules_cmd,
t_trace_cmd,
t_broker_cmd,
t_router_cmd,
@ -59,11 +58,11 @@ apps() ->
init_per_suite(Config) ->
ekka_mnesia:start(),
emqx_mgmt_auth:mnesia(boot),
emqx_ct_helpers:start_apps([emqx_modules, emqx_management, emqx_auth_mnesia]),
emqx_ct_helpers:start_apps([emqx_management, emqx_auth_mnesia]),
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([emqx_management, emqx_auth_mnesia, emqx_modules]).
emqx_ct_helpers:stop_apps([emqx_management, emqx_auth_mnesia]).
t_app(_Config) ->
{ok, AppSecret} = emqx_mgmt_auth:add_app(<<"app_id">>, <<"app_name">>),
@ -329,19 +328,6 @@ t_plugins_cmd(_) ->
),
unmock_print().
t_modules_cmd(_) ->
mock_print(),
meck:new(emqx_modules, [non_strict, passthrough]),
meck:expect(emqx_modules, load, fun(_) -> ok end),
meck:expect(emqx_modules, unload, fun(_) -> ok end),
meck:expect(emqx_modules, reload, fun(_) -> ok end),
?assertEqual(emqx_mgmt_cli:modules(["list"]), ok),
?assertEqual(emqx_mgmt_cli:modules(["load", "emqx_mod_presence"]),
"Module emqx_mod_presence loaded successfully.\n"),
?assertEqual(emqx_mgmt_cli:modules(["unload", "emqx_mod_presence"]),
"Module emqx_mod_presence unloaded successfully.\n"),
unmock_print().
t_cli(_) ->
mock_print(),
?assertMatch({match, _}, re:run(emqx_mgmt_cli:status([""]), "status")),

View File

@ -48,7 +48,6 @@ groups() ->
, metrics
, nodes
, plugins
, modules
, acl_cache
, pubsub
, routes_and_subscriptions
@ -58,13 +57,13 @@ groups() ->
}].
init_per_suite(Config) ->
emqx_ct_helpers:start_apps([emqx_modules, emqx_management, emqx_auth_mnesia]),
emqx_ct_helpers:start_apps([emqx_management, emqx_auth_mnesia]),
ekka_mnesia:start(),
emqx_mgmt_auth:mnesia(boot),
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([emqx_auth_mnesia, emqx_management, emqx_modules]),
emqx_ct_helpers:stop_apps([emqx_auth_mnesia, emqx_management]),
ekka_mnesia:ensure_stopped().
init_per_testcase(data, Config) ->
@ -102,7 +101,7 @@ get(Key, ResponseBody) ->
lookup_alarm(Name, [#{<<"name">> := Name} | _More]) ->
true;
lookup_alarm(Name, [_Alarm | More]) ->
lookup_alarm(Name, [_Alarm | More]) ->
lookup_alarm(Name, More);
lookup_alarm(_Name, []) ->
false.
@ -120,7 +119,7 @@ alarms(_) ->
?assert(is_existing(alarm1, emqx_alarm:get_alarms(activated))),
?assert(is_existing(alarm2, emqx_alarm:get_alarms(activated))),
{ok, Return1} = request_api(get, api_path(["alarms/activated"]), auth_header_()),
?assert(lookup_alarm(<<"alarm1">>, maps:get(<<"alarms">>, lists:nth(1, get(<<"data">>, Return1))))),
?assert(lookup_alarm(<<"alarm2">>, maps:get(<<"alarms">>, lists:nth(1, get(<<"data">>, Return1))))),
@ -231,7 +230,7 @@ clients(_) ->
{ok, Clients2} = request_api(get, api_path(["nodes", atom_to_list(node()),
"clients", binary_to_list(ClientId2)])
, auth_header_()),
?assertEqual(<<"client2">>, maps:get(<<"clientid">>, lists:nth(1, get(<<"data">>, Clients2)))),
?assertEqual(<<"client2">>, maps:get(<<"clientid">>, lists:nth(1, get(<<"data">>, Clients2)))),
{ok, Clients3} = request_api(get, api_path(["clients",
"username", binary_to_list(Username1)]),
@ -246,7 +245,7 @@ clients(_) ->
{ok, Clients5} = request_api(get, api_path(["clients"]), "_limit=100&_page=1", auth_header_()),
?assertEqual(2, maps:get(<<"count">>, get(<<"meta">>, Clients5))),
meck:new(emqx_mgmt, [passthrough, no_history]),
meck:expect(emqx_mgmt, kickout_client, 1, fun(_) -> {error, undefined} end),
@ -262,7 +261,7 @@ clients(_) ->
?assertEqual(?ERROR1, get(<<"code">>, MeckRet3)),
meck:unload(emqx_mgmt),
{ok, Ok} = request_api(delete, api_path(["clients", binary_to_list(ClientId1)]), auth_header_()),
?assertEqual(?SUCCESS, get(<<"code">>, Ok)),
@ -382,64 +381,6 @@ plugins(_) ->
auth_header_()),
?assertEqual(<<"not_started">>, get(<<"message">>, Error2)).
modules(_) ->
emqx_modules:load_module(emqx_mod_presence, false),
timer:sleep(50),
{ok, Modules1} = request_api(get, api_path(["modules"]), auth_header_()),
[Modules11] = filter(get(<<"data">>, Modules1), <<"node">>, atom_to_binary(node(), utf8)),
[Module1] = filter(maps:get(<<"modules">>, Modules11), <<"name">>, <<"emqx_mod_presence">>),
?assertEqual(<<"emqx_mod_presence">>, maps:get(<<"name">>, Module1)),
?assertEqual(true, maps:get(<<"active">>, Module1)),
{ok, _} = request_api(put,
api_path(["modules",
atom_to_list(emqx_mod_presence),
"unload"]),
auth_header_()),
{ok, Error1} = request_api(put,
api_path(["modules",
atom_to_list(emqx_mod_presence),
"unload"]),
auth_header_()),
?assertEqual(<<"not_started">>, get(<<"message">>, Error1)),
{ok, Modules2} = request_api(get,
api_path(["nodes", atom_to_list(node()), "modules"]),
auth_header_()),
[Module2] = filter(get(<<"data">>, Modules2), <<"name">>, <<"emqx_mod_presence">>),
?assertEqual(<<"emqx_mod_presence">>, maps:get(<<"name">>, Module2)),
?assertEqual(false, maps:get(<<"active">>, Module2)),
{ok, _} = request_api(put,
api_path(["nodes",
atom_to_list(node()),
"modules",
atom_to_list(emqx_mod_presence),
"load"]),
auth_header_()),
{ok, Modules3} = request_api(get,
api_path(["nodes", atom_to_list(node()), "modules"]),
auth_header_()),
[Module3] = filter(get(<<"data">>, Modules3), <<"name">>, <<"emqx_mod_presence">>),
?assertEqual(<<"emqx_mod_presence">>, maps:get(<<"name">>, Module3)),
?assertEqual(true, maps:get(<<"active">>, Module3)),
{ok, _} = request_api(put,
api_path(["nodes",
atom_to_list(node()),
"modules",
atom_to_list(emqx_mod_presence),
"unload"]),
auth_header_()),
{ok, Error2} = request_api(put,
api_path(["nodes",
atom_to_list(node()),
"modules",
atom_to_list(emqx_mod_presence),
"unload"]),
auth_header_()),
?assertEqual(<<"not_started">>, get(<<"message">>, Error2)),
emqx_modules:unload(emqx_mod_presence).
acl_cache(_) ->
ClientId = <<"client1">>,
Topic = <<"mytopic">>,
@ -495,7 +436,7 @@ pubsub(_) ->
<<"topics">> => <<"">>,
<<"qos">> => 1,
<<"payload">> => <<"hello">>}),
?assertEqual(?ERROR15, get(<<"code">>, BadTopic2)),
?assertEqual(?ERROR15, get(<<"code">>, BadTopic2)),
{ok, BadTopic3} = request_api(post, api_path(["mqtt/unsubscribe"]), [], auth_header_(),
#{<<"clientid">> => ClientId,

View File

@ -205,14 +205,18 @@ store_retained(Msg = #message{topic = Topic, payload = Payload}, Env) ->
msg = Msg,
expiry_time = get_expiry_time(Msg, Env)});
{true, false} ->
case mnesia:dirty_read(?TAB, Topic) of
[_] ->
mnesia:dirty_write(?TAB, #retained{topic = topic2tokens(Topic),
msg = Msg,
expiry_time = get_expiry_time(Msg, Env)});
[] ->
?LOG(error, "Cannot retain message(topic=~s) for table is full!", [Topic])
end;
{atomic, _} = mnesia:transaction(
fun() ->
case mnesia:read(?TAB, Topic) of
[_] ->
mnesia:write(?TAB, #retained{topic = topic2tokens(Topic),
msg = Msg,
expiry_time = get_expiry_time(Msg, Env)}, write);
[] ->
?LOG(error, "Cannot retain message(topic=~s) for table is full!", [Topic])
end
end),
ok;
{true, _} ->
?LOG(error, "Cannot retain message(topic=~s) for table is full!", [Topic]);
{_, true} ->

View File

@ -337,7 +337,7 @@ update_resource(#{id := Id}, NewParams) ->
P2 = case proplists:get_value(<<"config">>, NewParams) of
undefined -> #{};
[{}] -> #{};
Map -> #{<<"config">> => ?RAISE(maps:from_list(Map), {invalid_config, Map})}
Config -> #{<<"config">> => ?RAISE(json_term_to_map(Config), {invalid_config, Config})}
end,
case emqx_rule_engine:update_resource(Id, maps:merge(P1, P2)) of
ok ->
@ -552,4 +552,4 @@ get_rule_metrics(Id) ->
get_action_metrics(Id) ->
[maps:put(node, Node, rpc:call(Node, emqx_rule_metrics, get_action_metrics, [Id]))
|| Node <- ekka_mnesia:running_nodes()].
|| Node <- ekka_mnesia:running_nodes()].

View File

@ -407,8 +407,8 @@ delete_resource_type(Type) ->
init([]) ->
%% Enable stats timer
ok = emqx_stats:update_interval(rule_registery_stats, fun update_stats/0),
ets:new(?KV_TAB, [named_table, set, public, {write_concurrency, true},
{read_concurrency, true}]),
_TableId = ets:new(?KV_TAB, [named_table, set, public, {write_concurrency, true},
{read_concurrency, true}]),
{ok, #{}}.
handle_call({add_rules, Rules}, _From, State) ->

View File

@ -1,5 +1,5 @@
{deps,
[{pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.3"}}}
[{pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.4"}}}
]}.
{edoc_opts, [{preprocess, true}]}.

View File

@ -104,9 +104,6 @@
-define(NO_PEERCERT, undefined).
%% TODO: fix when https://github.com/emqx/emqx-sn/pull/170 is merged
-dialyzer([{nowarn_function, [idle/3]}]).
%%--------------------------------------------------------------------
%% Exported APIs
%%--------------------------------------------------------------------
@ -201,7 +198,7 @@ idle(cast, {incoming, ?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1,
false -> emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId);
true -> <<TopicId:16>>
end,
case TopicName =/= undefined of
_ = case TopicName =/= undefined of
true ->
Msg = emqx_message:make(?NEG_QOS_CLIENT_ID, ?QOS_0, TopicName, Data),
emqx_broker:publish(Msg);
@ -590,11 +587,7 @@ handle_call(_From, Req, State = #state{channel = Channel}) ->
{reply, Reply, NChannel} ->
{reply, Reply, State#state{channel = NChannel}};
{shutdown, Reason, Reply, NChannel} ->
shutdown(Reason, Reply, State#state{channel = NChannel});
{shutdown, Reason, Reply, OutPacket, NChannel} ->
NState = State#state{channel = NChannel},
ok = handle_outgoing(OutPacket, NState),
shutdown(Reason, Reply, NState)
shutdown(Reason, Reply, State#state{channel = NChannel})
end.
handle_info(Info, State = #state{channel = Channel}) ->
@ -609,8 +602,6 @@ handle_ping(_PingReq, State) ->
handle_timeout(TRef, TMsg, State = #state{channel = Channel}) ->
handle_return(emqx_channel:handle_timeout(TRef, TMsg, Channel), State).
handle_return(ok, State) ->
{keep_state, State};
handle_return({ok, NChannel}, State) ->
{keep_state, State#state{channel = NChannel}};
handle_return({ok, Replies, NChannel}, State) ->
@ -621,13 +612,7 @@ handle_return({shutdown, Reason, NChannel}, State) ->
handle_return({shutdown, Reason, OutPacket, NChannel}, State) ->
NState = State#state{channel = NChannel},
ok = handle_outgoing(OutPacket, NState),
stop({shutdown, Reason}, NState);
handle_return({stop, Reason, NChannel}, State) ->
stop(Reason, State#state{channel = NChannel});
handle_return({stop, Reason, OutPacket, NChannel}, State) ->
NState = State#state{channel = NChannel},
ok = handle_outgoing(OutPacket, NState),
stop(Reason, NState).
stop({shutdown, Reason}, NState).
next_events(Packet) when is_record(Packet, mqtt_packet) ->
next_event({outgoing, Packet});

View File

@ -1,14 +1 @@
{edoc_opts, [{preprocess, true}]}.
{erl_opts, [warn_unused_vars,
warn_shadow_vars,
warn_unused_import,
warn_obsolete_guard,
debug_info,
{parse_transform}]}.
{xref_checks, [undefined_function_calls, undefined_functions,
locals_not_used, deprecated_function_calls,
warnings_as_errors, deprecated_functions]}.
{cover_enabled, true}.
{cover_opts, [verbose]}.
{cover_export_enabled, true}.
{deps, []}.

View File

@ -30,7 +30,7 @@ start(_StartType, _StartArgs) ->
translate_env(),
{ok, Sup} = emqx_web_hook_sup:start_link(),
{ok, PoolOpts} = application:get_env(?APP, pool_opts),
ehttpc_sup:start_pool(?APP, PoolOpts),
{ok, _Pid} = ehttpc_sup:start_pool(?APP, PoolOpts),
emqx_web_hook:register_metrics(),
emqx_web_hook:load(),
{ok, Sup}.

View File

@ -20,8 +20,8 @@ mkdir -p "$RUNNER_LOG_DIR"
# Make sure data directory exists
mkdir -p "$RUNNER_DATA_DIR"
# cuttlefish try to read environment variables starting with "EMQX_", if not specified
export CUTTLEFISH_ENV_OVERRIDE_PREFIX="${CUTTLEFISH_ENV_OVERRIDE_PREFIX:-EMQX_}"
# cuttlefish try to read environment variables starting with "EMQX_"
export CUTTLEFISH_ENV_OVERRIDE_PREFIX='EMQX_'
relx_usage() {
command="$1"
@ -176,6 +176,7 @@ relx_nodetool() {
command="$1"; shift
ERL_FLAGS="$ERL_FLAGS $EPMD_ARG" \
ERL_LIBS="${LIB_EKKA_DIR}:${ERL_LIBS:-}" \
"$ERTS_DIR/bin/escript" "$ROOTDIR/bin/nodetool" "$NAME_TYPE" "$NAME" \
-setcookie "$COOKIE" "$command" "$@"
}
@ -338,6 +339,10 @@ case "$1" in
# Bootstrap daemon command (check perms & drop to $RUNNER_USER)
bootstrapd
# this flag passes down to console mode
# so we know it's intended to be run in daemon mode
export _EMQX_START_MODE="$1"
# Save this for later.
CMD=$1
case "$1" in
@ -517,6 +522,11 @@ case "$1" in
;;
esac
# set before generate_config
if [ "${_EMQX_START_MODE:-}" = '' ]; then
export EMQX_LOG__TO="${EMQX_LOG__TO:-console}"
fi
#generate app.config and vm.args
generate_config
@ -533,7 +543,8 @@ case "$1" in
# shellcheck disable=SC2086 # $RELX_CONFIG_PATH $CONFIG_ARGS $EPMD_ARG are supposed to be split by whitespace
# Build an array of arguments to pass to exec later on
# Build it here because this command will be used for logging.
set -- "$BINDIR/erlexec" -boot "$BOOTFILE" -mode "$CODE_LOADING_MODE" \
set -- "$BINDIR/erlexec" \
-boot "$BOOTFILE" -mode "$CODE_LOADING_MODE" \
-boot_var ERTS_LIB_DIR "$ERTS_LIB_DIR" \
-mnesia dir "\"${MNESIA_DATA_DIR}\"" \
$RELX_CONFIG_PATH $CONFIG_ARGS $EPMD_ARG
@ -556,6 +567,9 @@ case "$1" in
# start up the release in the foreground for use by runit
# or other supervision services
# set before generate_config
export EMQX_LOG__TO="${EMQX_LOG__TO:-console}"
#generate app.config and vm.args
generate_config

View File

@ -27,11 +27,11 @@ relx_nodetool() {
command="$1"; shift
ERL_FLAGS="$ERL_FLAGS $EPMD_ARG $PROTO_DIST_ARG" \
ERL_LIBS="${LIB_EKKA_DIR}:${ERL_LIBS:-}" \
"$ERTS_DIR/bin/escript" "$ROOTDIR/bin/nodetool" "$NAME_TYPE" "$NAME" \
-setcookie "$COOKIE" "$command" "$@"
}
if [ -z "$NAME_ARG" ]; then
NODENAME="${EMQX_NODE_NAME:-}"
# check if there is a node running, inspect its name

View File

@ -14,6 +14,7 @@ RUNNER_ETC_DIR="{{ runner_etc_dir }}"
RUNNER_DATA_DIR="{{ runner_data_dir }}"
RUNNER_USER="{{ runner_user }}"
EMQX_DISCR="{{ emqx_description }}"
LIB_EKKA_DIR="${RUNNER_LIB_DIR}/ekka-$(grep ekka "${RUNNER_ROOT_DIR}/releases/RELEASES" | awk -F '\"' '{print $2}')"
## computed vars
REL_NAME="emqx"

View File

@ -18,7 +18,8 @@ RUN apk add --no-cache \
bsd-compat-headers \
libc-dev \
libstdc++ \
bash
bash \
jq
COPY . /emqx

View File

@ -412,7 +412,7 @@ rpc.socket_buffer = 1MB
## - file: write logs only to file
## - console: write logs only to standard I/O
## - both: write logs both to file and standard I/O
log.to = both
log.to = file
## The log severity level.
##

View File

@ -14,7 +14,7 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_api_topic_metrics).
-module(emqx_mod_api_topic_metrics).
-import(minirest, [return/1]).
@ -57,8 +57,8 @@ list(#{topic := Topic0}, _Params) ->
execute_when_enabled(fun() ->
Topic = emqx_mgmt_util:urldecode(Topic0),
case safe_validate(Topic) of
true ->
case emqx_mgmt:get_topic_metrics(Topic) of
true ->
case get_topic_metrics(Topic) of
{error, Reason} -> return({error, Reason});
Metrics -> return({ok, maps:from_list(Metrics)})
end;
@ -69,12 +69,12 @@ list(#{topic := Topic0}, _Params) ->
list(_Bindings, _Params) ->
execute_when_enabled(fun() ->
case emqx_mgmt:get_all_topic_metrics() of
case get_all_topic_metrics() of
{error, Reason} -> return({error, Reason});
Metrics -> return({ok, Metrics})
end
end).
register(_Bindings, Params) ->
execute_when_enabled(fun() ->
case proplists:get_value(<<"topic">>, Params) of
@ -82,8 +82,8 @@ register(_Bindings, Params) ->
return({error, missing_required_params});
Topic ->
case safe_validate(Topic) of
true ->
emqx_mgmt:register_topic_metrics(Topic),
true ->
register_topic_metrics(Topic),
return(ok);
false ->
return({error, invalid_topic_name})
@ -93,7 +93,7 @@ register(_Bindings, Params) ->
unregister(Bindings, _Params) when map_size(Bindings) =:= 0 ->
execute_when_enabled(fun() ->
emqx_mgmt:unregister_all_topic_metrics(),
unregister_all_topic_metrics(),
return(ok)
end);
@ -101,8 +101,8 @@ unregister(#{topic := Topic0}, _Params) ->
execute_when_enabled(fun() ->
Topic = emqx_mgmt_util:urldecode(Topic0),
case safe_validate(Topic) of
true ->
emqx_mgmt:unregister_topic_metrics(Topic),
true ->
unregister_topic_metrics(Topic),
return(ok);
false ->
return({error, invalid_topic_name})
@ -128,3 +128,76 @@ safe_validate(Topic) ->
error:_Error ->
false
end.
get_all_topic_metrics() ->
lists:foldl(fun(Topic, Acc) ->
case get_topic_metrics(Topic) of
{error, _Reason} ->
Acc;
Metrics ->
[#{topic => Topic, metrics => Metrics} | Acc]
end
end, [], emqx_mod_topic_metrics:all_registered_topics()).
get_topic_metrics(Topic) ->
lists:foldl(fun(Node, Acc) ->
case get_topic_metrics(Node, Topic) of
{error, _Reason} ->
Acc;
Metrics ->
case Acc of
[] -> Metrics;
_ ->
lists:foldl(fun({K, V}, Acc0) ->
[{K, V + proplists:get_value(K, Metrics, 0)} | Acc0]
end, [], Acc)
end
end
end, [], ekka_mnesia:running_nodes()).
get_topic_metrics(Node, Topic) when Node =:= node() ->
emqx_mod_topic_metrics:metrics(Topic);
get_topic_metrics(Node, Topic) ->
rpc_call(Node, get_topic_metrics, [Node, Topic]).
register_topic_metrics(Topic) ->
Results = [register_topic_metrics(Node, Topic) || Node <- ekka_mnesia:running_nodes()],
case lists:any(fun(Item) -> Item =:= ok end, Results) of
true -> ok;
false -> lists:last(Results)
end.
register_topic_metrics(Node, Topic) when Node =:= node() ->
emqx_mod_topic_metrics:register(Topic);
register_topic_metrics(Node, Topic) ->
rpc_call(Node, register_topic_metrics, [Node, Topic]).
unregister_topic_metrics(Topic) ->
Results = [unregister_topic_metrics(Node, Topic) || Node <- ekka_mnesia:running_nodes()],
case lists:any(fun(Item) -> Item =:= ok end, Results) of
true -> ok;
false -> lists:last(Results)
end.
unregister_topic_metrics(Node, Topic) when Node =:= node() ->
emqx_mod_topic_metrics:unregister(Topic);
unregister_topic_metrics(Node, Topic) ->
rpc_call(Node, unregister_topic_metrics, [Node, Topic]).
unregister_all_topic_metrics() ->
Results = [unregister_all_topic_metrics(Node) || Node <- ekka_mnesia:running_nodes()],
case lists:any(fun(Item) -> Item =:= ok end, Results) of
true -> ok;
false -> lists:last(Results)
end.
unregister_all_topic_metrics(Node) when Node =:= node() ->
emqx_mod_topic_metrics:unregister_all();
unregister_all_topic_metrics(Node) ->
rpc_call(Node, unregister_topic_metrics, [Node]).
rpc_call(Node, Fun, Args) ->
case rpc:call(Node, ?MODULE, Fun, Args) of
{badrpc, Reason} -> {error, Reason};
Res -> Res
end.

View File

@ -30,6 +30,8 @@
, load_module/2
]).
-export([cli/1]).
%% @doc List all available plugins
-spec(list() -> [{atom(), boolean()}]).
list() ->
@ -170,3 +172,44 @@ write_loaded(true) ->
ok
end;
write_loaded(false) -> ok.
%%--------------------------------------------------------------------
%% @doc Modules Command
cli(["list"]) ->
lists:foreach(fun({Name, Active}) ->
emqx_ctl:print("Module(~s, description=~s, active=~s)~n",
[Name, Name:description(), Active])
end, emqx_modules:list());
cli(["load", Name]) ->
case emqx_modules:load(list_to_atom(Name)) of
ok ->
emqx_ctl:print("Module ~s loaded successfully.~n", [Name]);
{error, Reason} ->
emqx_ctl:print("Load module ~s error: ~p.~n", [Name, Reason])
end;
cli(["unload", Name]) ->
case emqx_modules:unload(list_to_atom(Name)) of
ok ->
emqx_ctl:print("Module ~s unloaded successfully.~n", [Name]);
{error, Reason} ->
emqx_ctl:print("Unload module ~s error: ~p.~n", [Name, Reason])
end;
cli(["reload", "emqx_mod_acl_internal" = Name]) ->
case emqx_modules:reload(list_to_atom(Name)) of
ok ->
emqx_ctl:print("Module ~s reloaded successfully.~n", [Name]);
{error, Reason} ->
emqx_ctl:print("Reload module ~s error: ~p.~n", [Name, Reason])
end;
cli(["reload", Name]) ->
emqx_ctl:print("Module: ~p does not need to be reloaded.~n", [Name]);
cli(_) ->
emqx_ctl:usage([{"modules list", "Show loaded modules"},
{"modules load <Module>", "Load module"},
{"modules unload <Module>", "Unload module"},
{"modules reload <Module>", "Reload module"}
]).

View File

@ -14,9 +14,7 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_api_modules).
-include("emqx_mgmt.hrl").
-module(emqx_modules_api).
-import(minirest, [return/1]).
@ -74,17 +72,21 @@
, reload/2
]).
-export([ do_load_module/2
, do_unload_module/2
]).
list(#{node := Node}, _Params) ->
return({ok, [format(Module) || Module <- emqx_mgmt:list_modules(Node)]});
return({ok, [format(Module) || Module <- list_modules(Node)]});
list(_Bindings, _Params) ->
return({ok, [format(Node, Modules) || {Node, Modules} <- emqx_mgmt:list_modules()]}).
return({ok, [format(Node, Modules) || {Node, Modules} <- list_modules()]}).
load(#{node := Node, module := Module}, _Params) ->
return(emqx_mgmt:load_module(Node, Module));
return(do_load_module(Node, Module));
load(#{module := Module}, _Params) ->
Results = [emqx_mgmt:load_module(Node, Module) || {Node, _Info} <- emqx_mgmt:list_nodes()],
Results = [do_load_module(Node, Module) || Node <- ekka_mnesia:running_nodes()],
case lists:filter(fun(Item) -> Item =/= ok end, Results) of
[] ->
return(ok);
@ -93,10 +95,10 @@ load(#{module := Module}, _Params) ->
end.
unload(#{node := Node, module := Module}, _Params) ->
return(emqx_mgmt:unload_module(Node, Module));
return(do_unload_module(Node, Module));
unload(#{module := Module}, _Params) ->
Results = [emqx_mgmt:unload_module(Node, Module) || {Node, _Info} <- emqx_mgmt:list_nodes()],
Results = [do_unload_module(Node, Module) || Node <- ekka_mnesia:running_nodes()],
case lists:filter(fun(Item) -> Item =/= ok end, Results) of
[] ->
return(ok);
@ -105,13 +107,13 @@ unload(#{module := Module}, _Params) ->
end.
reload(#{node := Node, module := Module}, _Params) ->
case emqx_mgmt:reload_module(Node, Module) of
case reload_module(Node, Module) of
ignore -> return(ok);
Result -> return(Result)
end;
reload(#{module := Module}, _Params) ->
Results = [emqx_mgmt:reload_module(Node, Module) || {Node, _Info} <- emqx_mgmt:list_nodes()],
Results = [reload_module(Node, Module) || Node <- ekka_mnesia:running_nodes()],
case lists:filter(fun(Item) -> Item =/= ok end, Results) of
[] ->
return(ok);
@ -119,6 +121,10 @@ reload(#{module := Module}, _Params) ->
return(lists:last(Errors))
end.
%%------------------------------------------------------------------------------
%% Internal Functions
%%------------------------------------------------------------------------------
format(Node, Modules) ->
#{node => Node, modules => [format(Module) || Module <- Modules]}.
@ -127,3 +133,31 @@ format({Name, Active}) ->
description => iolist_to_binary(Name:description()),
active => Active}.
list_modules() ->
[{Node, list_modules(Node)} || Node <- ekka_mnesia:running_nodes()].
list_modules(Node) when Node =:= node() ->
emqx_modules:list();
list_modules(Node) ->
rpc_call(Node, list_modules, [Node]).
do_load_module(Node, Module) when Node =:= node() ->
emqx_modules:load(Module);
do_load_module(Node, Module) ->
rpc_call(Node, do_load_module, [Node, Module]).
do_unload_module(Node, Module) when Node =:= node() ->
emqx_modules:unload(Module);
do_unload_module(Node, Module) ->
rpc_call(Node, do_unload_module, [Node, Module]).
reload_module(Node, Module) when Node =:= node() ->
emqx_modules:reload(Module);
reload_module(Node, Module) ->
rpc_call(Node, reload_module, [Node, Module]).
rpc_call(Node, Fun, Args) ->
case rpc:call(Node, ?MODULE, Fun, Args) of
{badrpc, Reason} -> {error, Reason};
Res -> Res
end.

View File

@ -18,8 +18,6 @@
-behaviour(application).
-emqx_plugin(?MODULE).
-export([start/2]).
-export([stop/1]).
@ -27,10 +25,12 @@
start(_Type, _Args) ->
% the configs for emqx_modules is so far still in emqx application
% Ensure it's loaded
application:load(emqx),
_ = application:load(emqx),
{ok, Pid} = emqx_mod_sup:start_link(),
ok = emqx_modules:load(),
emqx_ctl:register_command(modules, {emqx_modules, cli}, []),
{ok, Pid}.
stop(_State) ->
emqx_ctl:unregister_command(modules),
emqx_modules:unload().

View File

@ -21,10 +21,19 @@
-include_lib("eunit/include/eunit.hrl").
-define(CONTENT_TYPE, "application/x-www-form-urlencoded").
-define(HOST, "http://127.0.0.1:8081/").
-define(API_VERSION, "v4").
-define(BASE_PATH, "api").
all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) ->
emqx_ct_helpers:start_apps([emqx_modules], fun set_sepecial_cfg/1),
emqx_ct_helpers:start_apps([emqx_management, emqx_modules], fun set_sepecial_cfg/1),
emqx_ct_http:create_default_app(),
Config.
set_sepecial_cfg(_) ->
@ -32,7 +41,8 @@ set_sepecial_cfg(_) ->
ok.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([emqx_modules]).
emqx_ct_http:delete_default_app(),
emqx_ct_helpers:stop_apps([emqx_modules, emqx_management]).
t_load(_) ->
?assertEqual(ok, emqx_modules:unload()),
@ -45,3 +55,136 @@ t_load(_) ->
t_list(_) ->
?assertMatch([{_, _} | _ ], emqx_modules:list()).
t_modules_api(_) ->
emqx_modules:load_module(emqx_mod_presence, false),
timer:sleep(50),
{ok, Modules1} = request_api(get, api_path(["modules"]), auth_header_()),
[Modules11] = filter(get(<<"data">>, Modules1), <<"node">>, atom_to_binary(node(), utf8)),
[Module1] = filter(maps:get(<<"modules">>, Modules11), <<"name">>, <<"emqx_mod_presence">>),
?assertEqual(<<"emqx_mod_presence">>, maps:get(<<"name">>, Module1)),
?assertEqual(true, maps:get(<<"active">>, Module1)),
{ok, _} = request_api(put,
api_path(["modules",
atom_to_list(emqx_mod_presence),
"unload"]),
auth_header_()),
{ok, Error1} = request_api(put,
api_path(["modules",
atom_to_list(emqx_mod_presence),
"unload"]),
auth_header_()),
?assertEqual(<<"not_started">>, get(<<"message">>, Error1)),
{ok, Modules2} = request_api(get,
api_path(["nodes", atom_to_list(node()), "modules"]),
auth_header_()),
[Module2] = filter(get(<<"data">>, Modules2), <<"name">>, <<"emqx_mod_presence">>),
?assertEqual(<<"emqx_mod_presence">>, maps:get(<<"name">>, Module2)),
?assertEqual(false, maps:get(<<"active">>, Module2)),
{ok, _} = request_api(put,
api_path(["nodes",
atom_to_list(node()),
"modules",
atom_to_list(emqx_mod_presence),
"load"]),
auth_header_()),
{ok, Modules3} = request_api(get,
api_path(["nodes", atom_to_list(node()), "modules"]),
auth_header_()),
[Module3] = filter(get(<<"data">>, Modules3), <<"name">>, <<"emqx_mod_presence">>),
?assertEqual(<<"emqx_mod_presence">>, maps:get(<<"name">>, Module3)),
?assertEqual(true, maps:get(<<"active">>, Module3)),
{ok, _} = request_api(put,
api_path(["nodes",
atom_to_list(node()),
"modules",
atom_to_list(emqx_mod_presence),
"unload"]),
auth_header_()),
{ok, Error2} = request_api(put,
api_path(["nodes",
atom_to_list(node()),
"modules",
atom_to_list(emqx_mod_presence),
"unload"]),
auth_header_()),
?assertEqual(<<"not_started">>, get(<<"message">>, Error2)),
emqx_modules:unload(emqx_mod_presence).
t_modules_cmd(_) ->
mock_print(),
meck:new(emqx_modules, [non_strict, passthrough]),
meck:expect(emqx_modules, load, fun(_) -> ok end),
meck:expect(emqx_modules, unload, fun(_) -> ok end),
meck:expect(emqx_modules, reload, fun(_) -> ok end),
?assertEqual(emqx_modules:cli(["list"]), ok),
?assertEqual(emqx_modules:cli(["load", "emqx_mod_presence"]),
"Module emqx_mod_presence loaded successfully.\n"),
?assertEqual(emqx_modules:cli(["unload", "emqx_mod_presence"]),
"Module emqx_mod_presence unloaded successfully.\n"),
unmock_print().
mock_print() ->
catch meck:unload(emqx_ctl),
meck:new(emqx_ctl, [non_strict, passthrough]),
meck:expect(emqx_ctl, print, fun(Arg) -> emqx_ctl:format(Arg) end),
meck:expect(emqx_ctl, print, fun(Msg, Arg) -> emqx_ctl:format(Msg, Arg) end),
meck:expect(emqx_ctl, usage, fun(Usages) -> emqx_ctl:format_usage(Usages) end),
meck:expect(emqx_ctl, usage, fun(Cmd, Descr) -> emqx_ctl:format_usage(Cmd, Descr) end).
unmock_print() ->
meck:unload(emqx_ctl).
get(Key, ResponseBody) ->
maps:get(Key, jiffy:decode(list_to_binary(ResponseBody), [return_maps])).
request_api(Method, Url, Auth) ->
request_api(Method, Url, [], Auth, []).
request_api(Method, Url, QueryParams, Auth) ->
request_api(Method, Url, QueryParams, Auth, []).
request_api(Method, Url, QueryParams, Auth, []) ->
NewUrl = case QueryParams of
"" -> Url;
_ -> Url ++ "?" ++ QueryParams
end,
do_request_api(Method, {NewUrl, [Auth]});
request_api(Method, Url, QueryParams, Auth, Body) ->
NewUrl = case QueryParams of
"" -> Url;
_ -> Url ++ "?" ++ QueryParams
end,
do_request_api(Method, {NewUrl, [Auth], "application/json", emqx_json:encode(Body)}).
do_request_api(Method, Request)->
ct:pal("Method: ~p, Request: ~p", [Method, Request]),
case httpc:request(Method, Request, [], []) of
{error, socket_closed_remotely} ->
{error, socket_closed_remotely};
{ok, {{"HTTP/1.1", Code, _}, _, Return} }
when Code =:= 200 orelse Code =:= 201 ->
{ok, Return};
{ok, {Reason, _, _}} ->
{error, Reason}
end.
auth_header_() ->
AppId = <<"admin">>,
AppSecret = <<"public">>,
auth_header_(binary_to_list(AppId), binary_to_list(AppSecret)).
auth_header_(User, Pass) ->
Encoded = base64:encode_to_string(lists:append([User,":",Pass])),
{"Authorization","Basic " ++ Encoded}.
api_path(Parts)->
?HOST ++ filename:join([?BASE_PATH, ?API_VERSION] ++ Parts).
filter(List, Key, Value) ->
lists:filter(fun(Item) ->
maps:get(Key, Item) == Value
end, List).

View File

@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
set -e -u
# This script prints the release version for emqx

View File

@ -449,7 +449,7 @@ end}.
%%--------------------------------------------------------------------
{mapping, "log.to", "kernel.logger", [
{default, console},
{default, file},
{datatype, {enum, [off, file, console, both]}}
]}.

View File

@ -34,7 +34,7 @@
{post_hooks,[]}.
{erl_first_files, ["src/emqx_logger.erl"]}.
{erl_first_files, ["src/emqx_logger.erl", "src/emqx_rule_actions_trans.erl"]}.
{deps,
[ {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
@ -46,7 +46,7 @@
, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {branch, "hocon"}}}
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.3"}}}
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.0"}}}
, {replayq, {git, "https://github.com/emqx/replayq", {tag, "0.3.1"}}}
, {replayq, {git, "https://github.com/emqx/replayq", {tag, "0.3.2"}}}
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.4"}}}
, {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.3"}}}
, {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.2"}}}

View File

@ -2,11 +2,10 @@
-export([do/2]).
do(Dir, CONFIG) ->
ok = compile_and_load_pase_transforms(Dir),
do(_Dir, CONFIG) ->
C1 = deps(CONFIG),
Config = dialyzer(C1),
dump(Config ++ [{overrides, overrides()}] ++ coveralls() ++ config()).
maybe_dump(Config ++ [{overrides, overrides()}] ++ coveralls() ++ config()).
bcrypt() ->
{bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {branch, "0.6.0"}}}.
@ -23,6 +22,7 @@ overrides() ->
[ {add, [ {extra_src_dirs, [{"etc", [{recursive,true}]}]}
, {erl_opts, [ deterministic
, {compile_info, [{emqx_vsn, get_vsn()}]}
| [{d, 'EMQX_ENTERPRISE'} || is_enterprise()]
]}
]}
].
@ -33,9 +33,11 @@ config() ->
, {project_app_dirs, project_app_dirs()}
].
is_enterprise() ->
filelib:is_regular("EMQX_ENTERPRISE").
extra_lib_dir() ->
EnterpriseFlag = os:getenv("EMQX_ENTERPRISE"),
case EnterpriseFlag =:= "true" orelse EnterpriseFlag =:= "1" of
case is_enterprise() of
true -> "lib-ee";
false -> "lib-ce"
end.
@ -62,6 +64,7 @@ test_deps() ->
common_compile_opts() ->
[ deterministic
, {compile_info, [{emqx_vsn, get_vsn()}]}
| [{d, 'EMQX_ENTERPRISE'} || is_enterprise()]
].
prod_compile_opts() ->
@ -149,6 +152,7 @@ relx_apps(ReleaseType) ->
, {mnesia, load}
, {ekka, load}
, {emqx_plugin_libs, load}
, emqx_modules
]
++ [bcrypt || provide_bcrypt_release(ReleaseType)]
++ relx_apps_per_rel(ReleaseType)
@ -180,7 +184,9 @@ relx_plugin_apps(ReleaseType) ->
, emqx_sasl
, emqx_telemetry
, emqx_modules
] ++ relx_plugin_apps_per_rel(ReleaseType).
]
++ relx_plugin_apps_per_rel(ReleaseType)
++ relx_plugin_apps_enterprise(is_enterprise()).
relx_plugin_apps_per_rel(cloud) ->
[ emqx_lwm2m
@ -197,6 +203,11 @@ relx_plugin_apps_per_rel(cloud) ->
relx_plugin_apps_per_rel(edge) ->
[].
relx_plugin_apps_enterprise(true) ->
[list_to_atom(A) || A <- filelib:wildcard("*", "lib-ee"),
filelib:is_dir(filename:join(["lib-ee", A]))];
relx_plugin_apps_enterprise(false) -> [].
relx_overlay(ReleaseType) ->
[ {mkdir,"log/"}
, {mkdir,"data/"}
@ -244,11 +255,11 @@ extra_overlay(edge) ->
[].
emqx_etc_overlay(cloud) ->
emqx_etc_overlay_common() ++
[ {"etc/emqx_cloud.d/vm.args","etc/vm.args"}
[ {"etc/emqx_cloud/vm.args","etc/vm.args"}
];
emqx_etc_overlay(edge) ->
emqx_etc_overlay_common() ++
[ {"etc/emqx_edge.d/vm.args","etc/vm.args"}
[ {"etc/emqx_edge/vm.args","etc/vm.args"}
].
emqx_etc_overlay_common() ->
@ -286,10 +297,19 @@ get_vsn() ->
Vsn2 = re:replace(PkgVsn, "v", "", [{return ,list}]),
re:replace(Vsn2, "\n", "", [{return ,list}]).
dump(Config) ->
file:write_file("rebar.config.rendered", [io_lib:format("~p.\n", [I]) || I <- Config]),
maybe_dump(Config) ->
is_debug() andalso file:write_file("rebar.config.rendered", [io_lib:format("~p.\n", [I]) || I <- Config]),
Config.
is_debug() -> is_debug("DEBUG") orelse is_debug("DIAGNOSTIC").
is_debug(VarName) ->
case os:getenv(VarName) of
false -> false;
"" -> false;
_ -> true
end.
provide_bcrypt_dep() ->
case os:type() of
{win32, _} -> false;
@ -299,20 +319,6 @@ provide_bcrypt_dep() ->
provide_bcrypt_release(ReleaseType) ->
provide_bcrypt_dep() andalso ReleaseType =:= cloud.
%% this is a silly but working patch.
%% rebar3 does not handle umberella project's cross-app parse_transform well
compile_and_load_pase_transforms(Dir) ->
PtFiles =
[ "apps/emqx_rule_engine/src/emqx_rule_actions_trans.erl"
],
CompileOpts = [verbose,report_errors,report_warnings,return_errors,debug_info],
lists:foreach(fun(PtFile) -> {ok, _Mod} = compile:file(path(Dir, PtFile), CompileOpts) end, PtFiles).
path(Dir, Path) -> str(filename:join([Dir, Path])).
str(L) when is_list(L) -> L;
str(B) when is_binary(B) -> unicode:characters_to_list(B, utf8).
erl_opts_i() ->
[{i, "apps"}] ++
[{i, Dir} || Dir <- filelib:wildcard(filename:join(["apps", "*", "include"]))] ++

View File

@ -0,0 +1,63 @@
#!/usr/bin/env escript
%% NOTE: this script should be executed at project root.
-mode(compile).
main([]) ->
AppsDir = case filelib:is_file("EMQX_ENTERPRISE") of
true -> "lib-ee";
false -> "lib-ce"
end,
true = filelib:is_dir(AppsDir),
Files = ["rebar.config"] ++
apps_rebar_config("apps") ++
apps_rebar_config(AppsDir),
Deps = collect_deps(Files, #{}),
case count_bad_deps(Deps) of
0 ->
io:format("OK~n");
N ->
io:format(standard_error, "~p dependency discrepancies", [N]),
halt(1)
end.
apps_rebar_config(Dir) ->
filelib:wildcard(filename:join([Dir, "*", "rebar.config"])).
%% collect a kv-list of {DepName, [{DepReference, RebarConfigFile}]}
%% the value part should have unique DepReference
collect_deps([], Acc) -> maps:to_list(Acc);
collect_deps([File | Files], Acc) ->
Deps =
try
{ok, Config} = file:consult(File),
{deps, Deps0} = lists:keyfind(deps, 1, Config),
Deps0
catch
C : E : St ->
erlang:raise(C, {E, {failed_to_find_deps_in_rebar_config, File}}, St)
end,
collect_deps(Files, do_collect_deps(Deps, File, Acc)).
do_collect_deps([], _File, Acc) -> Acc;
do_collect_deps([{Name, Ref} | Deps], File, Acc) ->
Refs = maps:get(Name, Acc, []),
do_collect_deps(Deps, File, Acc#{Name => [{Ref, File} | Refs]}).
count_bad_deps([]) -> 0;
count_bad_deps([{Name, Refs0} | Rest]) ->
Refs = lists:keysort(1, Refs0),
case is_unique_ref(Refs) of
true ->
count_bad_deps(Rest);
false ->
io:format(standard_error, "~p:~n~p~n", [Name, Refs]),
1 + count_bad_deps(Rest)
end.
is_unique_ref([_]) -> true;
is_unique_ref([{Ref, _File1}, {Ref, File2} | Rest]) ->
is_unique_ref([{Ref, File2} | Rest]);
is_unique_ref(_) ->
false.

View File

@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
## This script checks style of changed files.
## Expect argument 1 to be the git compare base.

View File

@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
set -euo pipefail

View File

@ -1,22 +1,21 @@
#!/bin/bash
#!/usr/bin/env bash
set -euo pipefail
# ensure dir
cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/.."
if [[ "$1" == https://* ]]; then
VERSION='*' # alwyas download
DOWNLOAD_URL="$1"
else
VERSION="$1"
DOWNLOAD_URL="https://github.com/emqx/emqx-dashboard-frontend/releases/download/${VERSION}/emqx-dashboard.zip"
fi
VERSION="$1"
RELEASE_ASSET_FILE="emqx-dashboard.zip"
if [ "${EMQX_ENTERPRISE:-}" = 'true' ] || [ "${EMQX_ENTERPRISE:-}" == '1' ]; then
if [ -f 'EMQX_ENTERPRISE' ]; then
DASHBOARD_PATH='lib-ee/emqx_dashboard/priv'
DASHBOARD_REPO='emqx-enterprise-dashboard-frontend-src'
AUTH="Authorization: token $(cat scripts/git-token)"
else
DASHBOARD_PATH='lib-ce/emqx_dashboard/priv'
DASHBOARD_REPO='emqx-dashboard-frontend'
AUTH=""
fi
case $(uname) in
@ -32,8 +31,27 @@ if [ -d "$DASHBOARD_PATH/www" ] && [ "$(version)" = "$VERSION" ]; then
exit 0
fi
curl -f -L "${DOWNLOAD_URL}" -o ./emqx-dashboard.zip
unzip -q ./emqx-dashboard.zip -d "$DASHBOARD_PATH"
get_assets(){
# Get the download URL of our desired asset
download_url="$(curl --silent --show-error \
--header "${AUTH}" \
--header "Accept: application/vnd.github.v3+json" \
"https://api.github.com/repos/emqx/${DASHBOARD_REPO}/releases/tags/${VERSION}" \
| jq --raw-output ".assets[] | select(.name==\"${RELEASE_ASSET_FILE}\").url")"
# Get GitHub's S3 redirect URL
redirect_url=$(curl --silent --show-error \
--header "${AUTH}" \
--header "Accept: application/octet-stream" \
--write-out "%{redirect_url}" \
"$download_url")
curl --silent --show-error \
--header "Accept: application/octet-stream" \
--output "${RELEASE_ASSET_FILE}" \
"$redirect_url"
}
get_assets
unzip -q "$RELEASE_ASSET_FILE" -d "$DASHBOARD_PATH"
rm -rf "$DASHBOARD_PATH/www"
mv "$DASHBOARD_PATH/dist" "$DASHBOARD_PATH/www"
rm -rf emqx-dashboard.zip
rm -rf "$RELEASE_ASSET_FILE"

View File

@ -1,4 +1,5 @@
#!/bin/bash
#!/usr/bin/env bash
set -euo pipefail
target_files=()

View File

@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
set -euo pipefail

View File

@ -1,7 +1,7 @@
{application, emqx,
[{description, "EMQ X Broker"},
{id, "emqx"},
{vsn, "4.3-alpha.1"}, % strict semver, bump manually!
{vsn, "4.3-beta.1"}, % strict semver, bump manually!
{modules, []},
{registered, []},
{applications, [kernel,stdlib,gproc,gen_rpc,esockd,cowboy,sasl,os_mon]},

View File

@ -228,6 +228,9 @@ setting_peercert_infos(Peercert, ClientInfo, Options) ->
ClientId = peer_cert_as(peer_cert_as_clientid, Options, Peercert, DN, CN),
ClientInfo#{username => Username, clientid => ClientId, dn => DN, cn => CN}.
-dialyzer([{nowarn_function, [peer_cert_as/5]}]).
% esockd_peercert:peercert is opaque
% https://github.com/emqx/esockd/blob/master/src/esockd_peercert.erl
peer_cert_as(Key, Options, Peercert, DN, CN) ->
case proplists:get_value(Key, Options) of
cn -> CN;
@ -501,8 +504,10 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId),
ignore ->
case QoS of
?QOS_0 -> {ok, NChannel};
_ ->
handle_out(puback, {PacketId, Rc}, NChannel)
?QOS_1 ->
handle_out(puback, {PacketId, Rc}, NChannel);
?QOS_2 ->
handle_out(pubrec, {PacketId, Rc}, NChannel)
end;
disconnect ->
handle_out(disconnect, Rc, NChannel)

69
src/emqx_http_lib.erl Normal file
View File

@ -0,0 +1,69 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_http_lib).
-export([uri_encode/1, uri_decode/1]).
%% @doc Decode percent-encoded URI.
%% This is copied from http_uri.erl which has been deprecated since OTP-23
%% The recommended replacement uri_string function is not quite equivalent
%% and not backward compatible.
-spec uri_decode(binary()) -> binary().
uri_decode(<<$%, Hex:2/binary, Rest/bits>>) ->
<<(binary_to_integer(Hex, 16)), (uri_decode(Rest))/binary>>;
uri_decode(<<First:1/binary, Rest/bits>>) ->
<<First/binary, (uri_decode(Rest))/binary>>;
uri_decode(<<>>) ->
<<>>.
%% @doc Encode URI.
-spec uri_encode(binary()) -> binary().
uri_encode(URI) when is_binary(URI) ->
<< <<(uri_encode_binary(Char))/binary>> || <<Char>> <= URI >>.
uri_encode_binary(Char) ->
case reserved(Char) of
true ->
<< $%, (integer_to_binary(Char, 16))/binary >>;
false ->
<<Char>>
end.
reserved($;) -> true;
reserved($:) -> true;
reserved($@) -> true;
reserved($&) -> true;
reserved($=) -> true;
reserved($+) -> true;
reserved($,) -> true;
reserved($/) -> true;
reserved($?) -> true;
reserved($#) -> true;
reserved($[) -> true;
reserved($]) -> true;
reserved($<) -> true;
reserved($>) -> true;
reserved($\") -> true;
reserved(${) -> true;
reserved($}) -> true;
reserved($|) -> true;
reserved($\\) -> true;
reserved($') -> true;
reserved($^) -> true;
reserved($%) -> true;
reserved($\s) -> true;
reserved(_) -> false.

View File

@ -33,21 +33,21 @@
-define(TRACER, ?MODULE).
-define(FORMAT, {emqx_logger_formatter,
#{template =>
[time," [",level,"] ",
[time, " [", level, "] ",
{clientid,
[{peername,
[clientid,"@",peername," "],
[clientid, "@", peername, " "],
[clientid, " "]}],
[{peername,
[peername," "],
[peername, " "],
[]}]},
msg,"\n"]}}).
msg, "\n"]}}).
-define(TOPIC_TRACE_ID(T), "trace_topic_"++T).
-define(CLIENT_TRACE_ID(C), "trace_clientid_"++C).
-define(TOPIC_TRACE(T), {topic,T}).
-define(CLIENT_TRACE(C), {clientid,C}).
-define(TOPIC_TRACE(T), {topic, T}).
-define(CLIENT_TRACE(C), {clientid, C}).
-define(is_log_level(L),
-define(IS_LOG_LEVEL(L),
L =:= emergency orelse
L =:= alert orelse
L =:= critical orelse
@ -67,19 +67,24 @@ trace(publish, #message{topic = <<"$SYS/", _/binary>>}) ->
ignore;
trace(publish, #message{from = From, topic = Topic, payload = Payload})
when is_binary(From); is_atom(From) ->
emqx_logger:info(#{topic => Topic, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY} }, "PUBLISH to ~s: ~0p", [Topic, Payload]).
emqx_logger:info(#{topic => Topic,
mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY} },
"PUBLISH to ~s: ~0p", [Topic, Payload]).
%% @doc Start to trace clientid or topic.
-spec(start_trace(trace_who(), logger:level() | all, string()) -> ok | {error, term()}).
start_trace(Who, all, LogFile) ->
start_trace(Who, debug, LogFile);
start_trace(Who, Level, LogFile) ->
case ?is_log_level(Level) of
case ?IS_LOG_LEVEL(Level) of
true ->
#{level := PrimaryLevel} = logger:get_primary_config(),
try logger:compare_levels(Level, PrimaryLevel) of
lt ->
{error, io_lib:format("Cannot trace at a log level (~s) lower than the primary log level (~s)", [Level, PrimaryLevel])};
{error,
io_lib:format("Cannot trace at a log level (~s) "
"lower than the primary log level (~s)",
[Level, PrimaryLevel])};
_GtOrEq ->
install_trace_handler(Who, Level, LogFile)
catch
@ -103,7 +108,6 @@ install_trace_handler(Who, Level, LogFile) ->
case logger:add_handler(handler_id(Who), logger_disk_log_h,
#{level => Level,
formatter => ?FORMAT,
filesync_repeat_interval => no_repeat,
config => #{type => halt, file => LogFile},
filter_default => stop,
filters => [{meta_key_filter,
@ -128,9 +132,9 @@ uninstall_trance_handler(Who) ->
filter_traces(#{id := Id, level := Level, dst := Dst}, Acc) ->
case atom_to_list(Id) of
?TOPIC_TRACE_ID(T)->
[{?TOPIC_TRACE(T), {Level,Dst}} | Acc];
[{?TOPIC_TRACE(T), {Level, Dst}} | Acc];
?CLIENT_TRACE_ID(C) ->
[{?CLIENT_TRACE(C), {Level,Dst}} | Acc];
[{?CLIENT_TRACE(C), {Level, Dst}} | Acc];
_ -> Acc
end.

View File

@ -0,0 +1,46 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_http_lib_tests).
-include_lib("proper/include/proper.hrl").
-include_lib("eunit/include/eunit.hrl").
uri_encode_decode_test_() ->
Opts = [{numtests, 1000}, {to_file, user}],
{timeout, 10,
fun() -> ?assert(proper:quickcheck(prop_run(), Opts)) end}.
prop_run() ->
?FORALL(Generated, prop_uri(), test_prop_uri(iolist_to_binary(Generated))).
prop_uri() ->
proper_types:non_empty(proper_types:list(proper_types:union([prop_char(), prop_reserved()]))).
prop_char() -> proper_types:integer(32, 126).
prop_reserved() ->
proper_types:oneof([$;, $:, $@, $&, $=, $+, $,, $/, $?,
$#, $[, $], $<, $>, $\", ${, $}, $|,
$\\, $', $^, $%, $ ]).
test_prop_uri(URI) ->
Encoded = emqx_http_lib:uri_encode(URI),
Decoded1 = emqx_http_lib:uri_decode(Encoded),
?assertEqual(URI, Decoded1),
Decoded2 = uri_string:percent_decode(Encoded),
?assertEqual(URI, Decoded2),
true.