Merge branch 'master' into EMQX-871-872

This commit is contained in:
x1001100011 2021-08-11 21:54:59 -07:00
commit 5dd7f53662
46 changed files with 1793 additions and 489 deletions

View File

@ -1,11 +1,10 @@
#!/bin/bash #!/bin/bash
set -x -e -u set -x -e -u
export DEBUG=1
export CODE_PATH=${CODE_PATH:-"/emqx"} export CODE_PATH=${CODE_PATH:-"/emqx"}
export EMQX_NAME=${EMQX_NAME:-"emqx"} export EMQX_NAME=${EMQX_NAME:-"emqx"}
export PACKAGE_PATH="${CODE_PATH}/_packages/${EMQX_NAME}" export PACKAGE_PATH="${CODE_PATH}/_packages/${EMQX_NAME}"
export RELUP_PACKAGE_PATH="${CODE_PATH}/_upgrade_base" export RELUP_PACKAGE_PATH="${CODE_PATH}/_upgrade_base"
# export EMQX_NODE_NAME="emqx-on-$(uname -m)@127.0.0.1"
# export EMQX_NODE_COOKIE=$(date +%s%N)
case "$(uname -m)" in case "$(uname -m)" in
x86_64) x86_64)
@ -122,6 +121,9 @@ run_test(){
tee -a "$emqx_env_vars" <<EOF tee -a "$emqx_env_vars" <<EOF
export EMQX_ZONE__EXTERNAL__SERVER_KEEPALIVE=60 export EMQX_ZONE__EXTERNAL__SERVER_KEEPALIVE=60
export EMQX_MQTT__MAX_TOPIC_ALIAS=10 export EMQX_MQTT__MAX_TOPIC_ALIAS=10
export EMQX_LOG__CONSOLE_HANDLER__LEVEL=debug
export EMQX_LOG__FILE_HANDLERS__EMQX_LOG__LEVEL=debug
export EMQX_LOG__PRIMARY_LEVEL=debug
EOF EOF
## for ARM, due to CI env issue, skip start of quic listener for the moment ## for ARM, due to CI env issue, skip start of quic listener for the moment
[[ $(arch) == *arm* || $(arch) == aarch64 ]] && tee -a "$emqx_env_vars" <<EOF [[ $(arch) == *arm* || $(arch) == aarch64 ]] && tee -a "$emqx_env_vars" <<EOF
@ -151,6 +153,8 @@ EOF
# shellcheck disable=SC2009 # pgrep does not support Extended Regular Expressions # shellcheck disable=SC2009 # pgrep does not support Extended Regular Expressions
ps -ef | grep -E '\-progname\s.+emqx\s' ps -ef | grep -E '\-progname\s.+emqx\s'
if ! emqx 'stop'; then if ! emqx 'stop'; then
# shellcheck disable=SC2009 # pgrep does not support Extended Regular Expressions
ps -ef | grep -E '\-progname\s.+emqx\s'
echo "ERROR: failed_to_stop_emqx_with_the_stop_command" echo "ERROR: failed_to_stop_emqx_with_the_stop_command"
cat /var/log/emqx/erlang.log.1 || true cat /var/log/emqx/erlang.log.1 || true
cat /var/log/emqx/emqx.log.1 || true cat /var/log/emqx/emqx.log.1 || true

View File

@ -3,7 +3,7 @@ version: '3.9'
services: services:
haproxy: haproxy:
container_name: haproxy container_name: haproxy
image: haproxy:2.3 image: haproxy:2.4
depends_on: depends_on:
- emqx1 - emqx1
- emqx2 - emqx2
@ -23,7 +23,8 @@ services:
- bash - bash
- -c - -c
- | - |
cat /usr/local/etc/haproxy/certs/cert.pem /usr/local/etc/haproxy/certs/key.pem > /usr/local/etc/haproxy/certs/emqx.pem set -x
cat /usr/local/etc/haproxy/certs/cert.pem /usr/local/etc/haproxy/certs/key.pem > /tmp/emqx.pem
haproxy -f /usr/local/etc/haproxy/haproxy.cfg haproxy -f /usr/local/etc/haproxy/haproxy.cfg
emqx1: emqx1:

View File

@ -1,8 +1,8 @@
version: '3.9' version: '3.9'
services: services:
erlang: erlang23:
container_name: erlang container_name: erlang23
image: emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04 image: emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04
env_file: env_file:
- conf.env - conf.env
@ -21,6 +21,26 @@ services:
working_dir: /emqx working_dir: /emqx
tty: true tty: true
erlang24:
container_name: erlang24
image: emqx/build-env:erl24.0.5-emqx-1-ubuntu20.04
env_file:
- conf.env
environment:
GITHUB_ACTIONS: ${GITHUB_ACTIONS}
GITHUB_TOKEN: ${GITHUB_TOKEN}
GITHUB_RUN_ID: ${GITHUB_RUN_ID}
GITHUB_SHA: ${GITHUB_SHA}
GITHUB_RUN_NUMBER: ${GITHUB_RUN_NUMBER}
GITHUB_EVENT_NAME: ${GITHUB_EVENT_NAME}
GITHUB_REF: ${GITHUB_REF}
networks:
- emqx_bridge
volumes:
- ../..:/emqx
working_dir: /emqx
tty: true
networks: networks:
emqx_bridge: emqx_bridge:
driver: bridge driver: bridge

View File

@ -11,6 +11,7 @@ global
tune.ssl.default-dh-param 2048 tune.ssl.default-dh-param 2048
ssl-default-bind-ciphers ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-AES256-GCM-SHA384:DHE-RSA-AES128-GCM-SHA256:DHE-DSS-AES128-GCM-SHA256:kEDH+AESGCM:ECDHE-RSA-AES128-SHA256:ECDHE-ECDSA-AES128-SHA256:ECDHE-RSA-AES128-SHA:ECDHE-ECDSA-AES128-SHA:ECDHE-RSA-AES256-SHA384:ECDHE-ECDSA-AES256-SHA384:ECDHE-RSA-AES256-SHA:ECDHE-ECDSA-AES256-SHA:DHE-RSA-AES128-SHA256:DHE-RSA-AES128-SHA:DHE-DSS-AES128-SHA256:DHE-RSA-AES256-SHA256:DHE-DSS-AES256-SHA:DHE-RSA-AES256-SHA:ECDHE-RSA-DES-CBC3-SHA:ECDHE-ECDSA-DES-CBC3-SHA:EDH-RSA-DES-CBC3-SHA:AES128-GCM-SHA256:AES256-GCM-SHA384:AES128-SHA256:AES256-SHA256:AES128-SHA:AES256-SHA:AES:DES-CBC3-SHA:HIGH:SEED:!aNULL:!eNULL:!EXPORT:!DES:!RC4:!MD5:!PSK:!RSAPSK:!aDH:!aECDH:!EDH-DSS-DES-CBC3-SHA:!KRB5-DES-CBC3-SHA:!SRP ssl-default-bind-ciphers ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-AES256-GCM-SHA384:DHE-RSA-AES128-GCM-SHA256:DHE-DSS-AES128-GCM-SHA256:kEDH+AESGCM:ECDHE-RSA-AES128-SHA256:ECDHE-ECDSA-AES128-SHA256:ECDHE-RSA-AES128-SHA:ECDHE-ECDSA-AES128-SHA:ECDHE-RSA-AES256-SHA384:ECDHE-ECDSA-AES256-SHA384:ECDHE-RSA-AES256-SHA:ECDHE-ECDSA-AES256-SHA:DHE-RSA-AES128-SHA256:DHE-RSA-AES128-SHA:DHE-DSS-AES128-SHA256:DHE-RSA-AES256-SHA256:DHE-DSS-AES256-SHA:DHE-RSA-AES256-SHA:ECDHE-RSA-DES-CBC3-SHA:ECDHE-ECDSA-DES-CBC3-SHA:EDH-RSA-DES-CBC3-SHA:AES128-GCM-SHA256:AES256-GCM-SHA384:AES128-SHA256:AES256-SHA256:AES128-SHA:AES256-SHA:AES:DES-CBC3-SHA:HIGH:SEED:!aNULL:!eNULL:!EXPORT:!DES:!RC4:!MD5:!PSK:!RSAPSK:!aDH:!aECDH:!EDH-DSS-DES-CBC3-SHA:!KRB5-DES-CBC3-SHA:!SRP
# Enable the HAProxy Runtime API # Enable the HAProxy Runtime API
# e.g. echo "show table emqx_tcp_back" | sudo socat stdio tcp4-connect:172.100.239.4:9999
stats socket :9999 level admin expose-fd listeners stats socket :9999 level admin expose-fd listeners
##---------------------------------------------------------------- ##----------------------------------------------------------------
@ -61,6 +62,8 @@ frontend emqx_tcp
mode tcp mode tcp
option tcplog option tcplog
bind *:1883 bind *:1883
# Reject connections that have an invalid MQTT packet
# tcp-request content reject unless { req.payload(0,0), mqtt_is_valid }
default_backend emqx_tcp_back default_backend emqx_tcp_back
frontend emqx_ws frontend emqx_ws
@ -71,7 +74,13 @@ frontend emqx_ws
backend emqx_tcp_back backend emqx_tcp_back
mode tcp mode tcp
balance static-rr
# Create a stick table for session persistence
stick-table type string len 32 size 100k expire 30m
# Use ClientID / client_identifier as persistence key
stick on req.payload(0,0),mqtt_field_value(connect,client_identifier)
server emqx-1 node1.emqx.io:1883 check-send-proxy send-proxy-v2 server emqx-1 node1.emqx.io:1883 check-send-proxy send-proxy-v2
server emqx-2 node2.emqx.io:1883 check-send-proxy send-proxy-v2 server emqx-2 node2.emqx.io:1883 check-send-proxy send-proxy-v2
@ -87,13 +96,13 @@ backend emqx_ws_back
frontend emqx_ssl frontend emqx_ssl
mode tcp mode tcp
option tcplog option tcplog
bind *:8883 ssl crt /usr/local/etc/haproxy/certs/emqx.pem ca-file /usr/local/etc/haproxy/certs/cacert.pem verify required no-sslv3 bind *:8883 ssl crt /tmp/emqx.pem ca-file /usr/local/etc/haproxy/certs/cacert.pem verify required no-sslv3
default_backend emqx_ssl_back default_backend emqx_ssl_back
frontend emqx_wss frontend emqx_wss
mode tcp mode tcp
option tcplog option tcplog
bind *:8084 ssl crt /usr/local/etc/haproxy/certs/emqx.pem ca-file /usr/local/etc/haproxy/certs/cacert.pem verify required no-sslv3 bind *:8084 ssl crt /tmp/emqx.pem ca-file /usr/local/etc/haproxy/certs/cacert.pem verify required no-sslv3
default_backend emqx_wss_back default_backend emqx_wss_back
backend emqx_ssl_back backend emqx_ssl_back

View File

@ -10,8 +10,14 @@ on:
jobs: jobs:
prepare: prepare:
strategy:
matrix:
container:
- "emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04"
- "emqx/build-env:erl24.0.5-emqx-1-ubuntu20.04"
runs-on: ubuntu-20.04 runs-on: ubuntu-20.04
container: emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04 container: ${{ matrix.container }}
outputs: outputs:
profiles: ${{ steps.set_profile.outputs.profiles}} profiles: ${{ steps.set_profile.outputs.profiles}}
@ -79,7 +85,7 @@ jobs:
- uses: gleam-lang/setup-erlang@v1.1.0 - uses: gleam-lang/setup-erlang@v1.1.0
id: install_erlang id: install_erlang
with: with:
otp-version: 23.2 otp-version: 24.0.5
- name: build - name: build
env: env:
PYTHON: python PYTHON: python
@ -135,7 +141,7 @@ jobs:
matrix: matrix:
profile: ${{fromJSON(needs.prepare.outputs.profiles)}} profile: ${{fromJSON(needs.prepare.outputs.profiles)}}
erl_otp: erl_otp:
- 23.2.7.2-emqx-2 - 24.0.5-emqx-1
exclude: exclude:
- profile: emqx-edge - profile: emqx-edge
@ -213,6 +219,9 @@ jobs:
fail-fast: false fail-fast: false
matrix: matrix:
profile: ${{fromJSON(needs.prepare.outputs.profiles)}} profile: ${{fromJSON(needs.prepare.outputs.profiles)}}
erl_otp:
- 23.2.7.2-emqx-2
- 24.0.5-emqx-1
arch: arch:
- amd64 - amd64
- arm64 - arm64
@ -228,8 +237,6 @@ jobs:
- centos6 - centos6
- raspbian10 - raspbian10
# - raspbian9 # - raspbian9
erl_otp:
- 23.2.7.2-emqx-2
exclude: exclude:
- os: centos6 - os: centos6
arch: arm64 arch: arm64
@ -332,7 +339,7 @@ jobs:
matrix: matrix:
profile: ${{fromJSON(needs.prepare.outputs.profiles)}} profile: ${{fromJSON(needs.prepare.outputs.profiles)}}
erl_otp: erl_otp:
- 23.2.7.2-emqx-2 - 24.0.5-emqx-1
steps: steps:
- uses: actions/download-artifact@v2 - uses: actions/download-artifact@v2

View File

@ -15,7 +15,8 @@ jobs:
strategy: strategy:
matrix: matrix:
erl_otp: erl_otp:
- erl23.2.7.2-emqx-2 - erl24.0.5-emqx-1
os: os:
- ubuntu20.04 - ubuntu20.04
- centos7 - centos7
@ -43,7 +44,7 @@ jobs:
with: with:
name: rebar3.crashdump name: rebar3.crashdump
path: ./rebar3.crashdump path: ./rebar3.crashdump
- name: pakcages test - name: packages test
run: | run: |
export CODE_PATH=$GITHUB_WORKSPACE export CODE_PATH=$GITHUB_WORKSPACE
.ci/build_packages/tests.sh .ci/build_packages/tests.sh
@ -58,7 +59,7 @@ jobs:
strategy: strategy:
matrix: matrix:
erl_otp: erl_otp:
- 23.2.7.2-emqx-2 - 24.0.5-emqx-1
steps: steps:
- uses: actions/checkout@v1 - uses: actions/checkout@v1

View File

@ -4,8 +4,13 @@ on: [pull_request]
jobs: jobs:
check_deps_integrity: check_deps_integrity:
strategy:
matrix:
container:
- "emqx/build-env:erl24.0.5-emqx-1-ubuntu20.04"
runs-on: ubuntu-20.04 runs-on: ubuntu-20.04
container: emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04 container: ${{ matrix.container }}
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v2

View File

@ -9,8 +9,14 @@ on:
jobs: jobs:
check_all: check_all:
strategy:
matrix:
container:
- "emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04"
- "emqx/build-env:erl24.0.5-emqx-1-ubuntu20.04"
runs-on: ubuntu-20.04 runs-on: ubuntu-20.04
container: emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04 container: ${{ matrix.container }}
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v2

View File

@ -16,7 +16,7 @@ jobs:
- uses: gleam-lang/setup-erlang@v1.1.2 - uses: gleam-lang/setup-erlang@v1.1.2
id: install_erlang id: install_erlang
with: with:
otp-version: 23.2 otp-version: 24.0.5
- name: prepare - name: prepare
run: | run: |
if make emqx-ee --dry-run > /dev/null 2>&1; then if make emqx-ee --dry-run > /dev/null 2>&1; then
@ -59,8 +59,11 @@ jobs:
- name: make paho tests - name: make paho tests
run: | run: |
if ! docker exec -i python /scripts/pytest.sh; then if ! docker exec -i python /scripts/pytest.sh; then
echo "DUMP_CONTAINER_LOGS_BGN"
docker logs haproxy
docker logs node1.emqx.io docker logs node1.emqx.io
docker logs node2.emqx.io docker logs node2.emqx.io
echo "DUMP_CONTAINER_LOGS_END"
exit 1 exit 1
fi fi
@ -72,7 +75,7 @@ jobs:
- uses: gleam-lang/setup-erlang@v1.1.2 - uses: gleam-lang/setup-erlang@v1.1.2
id: install_erlang id: install_erlang
with: with:
otp-version: 23.2 otp-version: 24.0.5
- name: prepare - name: prepare
run: | run: |
if make emqx-ee --dry-run > /dev/null 2>&1; then if make emqx-ee --dry-run > /dev/null 2>&1; then
@ -183,8 +186,15 @@ jobs:
exit $RESULT exit $RESULT
relup_test: relup_test:
strategy:
matrix:
container:
- "emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04"
- "emqx/build-env:erl24.0.5-emqx-1-ubuntu20.04"
runs-on: ubuntu-20.04 runs-on: ubuntu-20.04
container: emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04 container: ${{ matrix.container }}
defaults: defaults:
run: run:
shell: bash shell: bash
@ -295,4 +305,3 @@ jobs:
with: with:
name: lux_logs name: lux_logs
path: lux_logs path: lux_logs

View File

@ -9,8 +9,14 @@ on:
jobs: jobs:
run_static_analysis: run_static_analysis:
strategy:
matrix:
container:
- "emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04"
- "emqx/build-env:erl24.0.5-emqx-1-ubuntu20.04"
runs-on: ubuntu-20.04 runs-on: ubuntu-20.04
container: emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04 container: ${{ matrix.container }}
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
@ -26,8 +32,14 @@ jobs:
run: make dialyzer run: make dialyzer
run_proper_test: run_proper_test:
strategy:
matrix:
container:
- "emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04"
- "emqx/build-env:erl24.0.5-emqx-1-ubuntu20.04"
runs-on: ubuntu-20.04 runs-on: ubuntu-20.04
container: emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04 container: ${{ matrix.container }}
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
@ -41,6 +53,12 @@ jobs:
run: make proper run: make proper
run_common_test: run_common_test:
strategy:
matrix:
otp_release:
- "erlang23"
- "erlang24"
runs-on: ubuntu-20.04 runs-on: ubuntu-20.04
steps: steps:
@ -73,15 +91,15 @@ jobs:
up -d --build up -d --build
- name: run eunit - name: run eunit
run: | run: |
docker exec -i erlang bash -c "make eunit" docker exec -i ${{ matrix.otp_release }} bash -c "make eunit"
- name: run common test - name: run common test
run: | run: |
docker exec -i erlang bash -c "make ct" docker exec -i ${{ matrix.otp_release }} bash -c "make ct"
- name: run cover - name: run cover
run: | run: |
printenv > .env printenv > .env
docker exec -i erlang bash -c "make cover" docker exec -i ${{ matrix.otp_release }} bash -c "make cover"
docker exec --env-file .env -i erlang bash -c "make coveralls" docker exec --env-file .env -i ${{ matrix.otp_release }} bash -c "make coveralls"
- name: cat rebar.crashdump - name: cat rebar.crashdump
if: failure() if: failure()
run: if [ -f 'rebar3.crashdump' ];then cat 'rebar3.crashdump'; fi run: if [ -f 'rebar3.crashdump' ];then cat 'rebar3.crashdump'; fi

View File

@ -1 +1 @@
erlang 24.0.1-emqx-1 erlang 24.0.5-emqx-1

View File

@ -5,7 +5,7 @@ BUILD = $(CURDIR)/build
SCRIPTS = $(CURDIR)/scripts SCRIPTS = $(CURDIR)/scripts
export PKG_VSN ?= $(shell $(CURDIR)/pkg-vsn.sh) export PKG_VSN ?= $(shell $(CURDIR)/pkg-vsn.sh)
export EMQX_DESC ?= EMQ X export EMQX_DESC ?= EMQ X
export EMQX_DASHBOARD_VERSION ?= v5.0.0-beta.3 export EMQX_DASHBOARD_VERSION ?= v5.0.0-beta.4
ifeq ($(OS),Windows_NT) ifeq ($(OS),Windows_NT)
export REBAR_COLOR=none export REBAR_COLOR=none
endif endif

View File

@ -18,7 +18,7 @@
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.11.0"}}} , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.11.0"}}}
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}} , {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.13.0"}}} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.14.1"}}}
]}. ]}.
{plugins, [rebar3_proper]}. {plugins, [rebar3_proper]}.

View File

@ -32,6 +32,7 @@
-export([ get/1 -export([ get/1
, get/2 , get/2
, find/1 , find/1
, find_raw/1
, put/1 , put/1
, put/2 , put/2
]). ]).
@ -60,11 +61,19 @@
, put_raw/2 , put_raw/2
]). ]).
-define(CONF, fun(ROOT) -> {?MODULE, bin(ROOT)} end). -define(CONF, conf).
-define(RAW_CONF, fun(ROOT) -> {?MODULE, raw, bin(ROOT)} end). -define(RAW_CONF, raw_conf).
-define(PERSIS_KEY(TYPE, ROOT), {?MODULE, TYPE, ROOT}).
-define(ZONE_CONF_PATH(ZONE, PATH), [zones, ZONE | PATH]). -define(ZONE_CONF_PATH(ZONE, PATH), [zones, ZONE | PATH]).
-define(LISTENER_CONF_PATH(ZONE, LISTENER, PATH), [zones, ZONE, listeners, LISTENER | PATH]). -define(LISTENER_CONF_PATH(ZONE, LISTENER, PATH), [zones, ZONE, listeners, LISTENER | PATH]).
-define(ATOM_CONF_PATH(PATH, EXP, EXP_ON_FAIL),
try [atom(Key) || Key <- PATH] of
AtomKeyPath -> EXP
catch
error:badarg -> EXP_ON_FAIL
end).
-export_type([update_request/0, raw_config/0, config/0]). -export_type([update_request/0, raw_config/0, config/0]).
-type update_request() :: term(). -type update_request() :: term().
%% raw_config() is the config that is NOT parsed and tranlated by hocon schema %% raw_config() is the config that is NOT parsed and tranlated by hocon schema
@ -93,8 +102,28 @@ get(KeyPath, Default) -> do_get(?CONF, KeyPath, Default).
-spec find(emqx_map_lib:config_key_path()) -> -spec find(emqx_map_lib:config_key_path()) ->
{ok, term()} | {not_found, emqx_map_lib:config_key_path(), term()}. {ok, term()} | {not_found, emqx_map_lib:config_key_path(), term()}.
find([]) ->
Ref = make_ref(),
Res = do_get(?CONF, [], Ref),
case Res =:= Ref of
true -> {not_found, []};
false -> {ok, Res}
end;
find(KeyPath) -> find(KeyPath) ->
emqx_map_lib:deep_find(KeyPath, get_root(KeyPath)). ?ATOM_CONF_PATH(KeyPath, emqx_map_lib:deep_find(AtomKeyPath, get_root(KeyPath)),
{not_found, KeyPath}).
-spec find_raw(emqx_map_lib:config_key_path()) ->
{ok, term()} | {not_found, emqx_map_lib:config_key_path(), term()}.
find_raw([]) ->
Ref = make_ref(),
Res = do_get(?RAW_CONF, [], Ref),
case Res =:= Ref of
true -> {not_found, []};
false -> {ok, Res}
end;
find_raw(KeyPath) ->
emqx_map_lib:deep_find([bin(Key) || Key <- KeyPath], get_root_raw(KeyPath)).
-spec get_zone_conf(atom(), emqx_map_lib:config_key_path()) -> term(). -spec get_zone_conf(atom(), emqx_map_lib:config_key_path()) -> term().
get_zone_conf(Zone, KeyPath) -> get_zone_conf(Zone, KeyPath) ->
@ -141,20 +170,20 @@ put(KeyPath, Config) -> do_put(?CONF, KeyPath, Config).
-spec update(emqx_map_lib:config_key_path(), update_request()) -> -spec update(emqx_map_lib:config_key_path(), update_request()) ->
ok | {error, term()}. ok | {error, term()}.
update(ConfKeyPath, UpdateReq) -> update(KeyPath, UpdateReq) ->
update(emqx_schema, ConfKeyPath, UpdateReq). update(emqx_schema, KeyPath, UpdateReq).
-spec update(module(), emqx_map_lib:config_key_path(), update_request()) -> -spec update(module(), emqx_map_lib:config_key_path(), update_request()) ->
ok | {error, term()}. ok | {error, term()}.
update(SchemaModule, ConfKeyPath, UpdateReq) -> update(SchemaModule, KeyPath, UpdateReq) ->
emqx_config_handler:update_config(SchemaModule, ConfKeyPath, UpdateReq). emqx_config_handler:update_config(SchemaModule, KeyPath, {update, UpdateReq}).
-spec remove(emqx_map_lib:config_key_path()) -> ok | {error, term()}. -spec remove(emqx_map_lib:config_key_path()) -> ok | {error, term()}.
remove(ConfKeyPath) -> remove(KeyPath) ->
remove(emqx_schema, ConfKeyPath). remove(emqx_schema, KeyPath).
remove(SchemaModule, ConfKeyPath) -> remove(SchemaModule, KeyPath) ->
emqx_config_handler:remove_config(SchemaModule, ConfKeyPath). emqx_config_handler:update_config(SchemaModule, KeyPath, remove).
-spec get_raw(emqx_map_lib:config_key_path()) -> term(). -spec get_raw(emqx_map_lib:config_key_path()) -> term().
get_raw(KeyPath) -> do_get(?RAW_CONF, KeyPath). get_raw(KeyPath) -> do_get(?RAW_CONF, KeyPath).
@ -262,24 +291,60 @@ load_hocon_file(FileName, LoadType) ->
emqx_override_conf_name() -> emqx_override_conf_name() ->
application:get_env(emqx, override_conf_file, "emqx_override.conf"). application:get_env(emqx, override_conf_file, "emqx_override.conf").
bin(Bin) when is_binary(Bin) -> Bin; do_get(Type, KeyPath) ->
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).
do_get(PtKey, KeyPath) ->
Ref = make_ref(), Ref = make_ref(),
Res = do_get(PtKey, KeyPath, Ref), Res = do_get(Type, KeyPath, Ref),
case Res =:= Ref of case Res =:= Ref of
true -> error({config_not_found, KeyPath}); true -> error({config_not_found, KeyPath});
false -> Res false -> Res
end. end.
do_get(PtKey, [RootName], Default) -> do_get(Type, [], Default) ->
persistent_term:get(PtKey(RootName), Default); AllConf = lists:foldl(fun
do_get(PtKey, [RootName | KeyPath], Default) -> ({?PERSIS_KEY(Type0, RootName), Conf}, AccIn) when Type0 == Type ->
RootV = persistent_term:get(PtKey(RootName), #{}), AccIn#{conf_key(Type0, RootName) => Conf};
emqx_map_lib:deep_get(KeyPath, RootV, Default). (_, AccIn) -> AccIn
end, #{}, persistent_term:get()),
case map_size(AllConf) == 0 of
true -> Default;
false -> AllConf
end;
do_get(Type, [RootName], Default) ->
persistent_term:get(?PERSIS_KEY(Type, bin(RootName)), Default);
do_get(Type, [RootName | KeyPath], Default) ->
RootV = persistent_term:get(?PERSIS_KEY(Type, bin(RootName)), #{}),
do_deep_get(Type, KeyPath, RootV, Default).
do_put(PtKey, [RootName | KeyPath], DeepValue) -> do_put(Type, [], DeepValue) ->
OldValue = do_get(PtKey, [RootName], #{}), maps:fold(fun(RootName, Value, _Res) ->
NewValue = emqx_map_lib:deep_put(KeyPath, OldValue, DeepValue), do_put(Type, [RootName], Value)
persistent_term:put(PtKey(RootName), NewValue). end, ok, DeepValue);
do_put(Type, [RootName | KeyPath], DeepValue) ->
OldValue = do_get(Type, [RootName], #{}),
NewValue = do_deep_put(Type, KeyPath, OldValue, DeepValue),
persistent_term:put(?PERSIS_KEY(Type, bin(RootName)), NewValue).
do_deep_get(?CONF, KeyPath, Map, Default) ->
?ATOM_CONF_PATH(KeyPath, emqx_map_lib:deep_get(AtomKeyPath, Map, Default),
Default);
do_deep_get(?RAW_CONF, KeyPath, Map, Default) ->
emqx_map_lib:deep_get([bin(Key) || Key <- KeyPath], Map, Default).
do_deep_put(?CONF, KeyPath, Map, Value) ->
?ATOM_CONF_PATH(KeyPath, emqx_map_lib:deep_put(AtomKeyPath, Map, Value),
error({not_found, KeyPath}));
do_deep_put(?RAW_CONF, KeyPath, Map, Value) ->
emqx_map_lib:deep_put([bin(Key) || Key <- KeyPath], Map, Value).
atom(Bin) when is_binary(Bin) ->
binary_to_existing_atom(Bin, latin1);
atom(Atom) when is_atom(Atom) ->
Atom.
bin(Bin) when is_binary(Bin) -> Bin;
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).
conf_key(?CONF, RootName) ->
atom(RootName);
conf_key(?RAW_CONF, RootName) ->
bin(RootName).

View File

@ -25,7 +25,6 @@
-export([ start_link/0 -export([ start_link/0
, add_handler/2 , add_handler/2
, update_config/3 , update_config/3
, remove_config/2
, merge_to_old_config/2 , merge_to_old_config/2
]). ]).
@ -38,10 +37,10 @@
code_change/3]). code_change/3]).
-define(MOD, {mod}). -define(MOD, {mod}).
-define(REMOVE_CONF, '$remove_config').
-type handler_name() :: module(). -type handler_name() :: module().
-type handlers() :: #{emqx_config:config_key() => handlers(), ?MOD => handler_name()}. -type handlers() :: #{emqx_config:config_key() => handlers(), ?MOD => handler_name()}.
-type update_args() :: {update, emqx_config:update_request()} | remove.
-optional_callbacks([ pre_config_update/2 -optional_callbacks([ pre_config_update/2
, post_config_update/3 , post_config_update/3
@ -61,15 +60,10 @@
start_link() -> start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, {}, []). gen_server:start_link({local, ?MODULE}, ?MODULE, {}, []).
-spec update_config(module(), emqx_config:config_key_path(), emqx_config:update_request()) -> -spec update_config(module(), emqx_config:config_key_path(), update_args()) ->
ok | {error, term()}. ok | {error, term()}.
update_config(SchemaModule, ConfKeyPath, UpdateReq) when UpdateReq =/= ?REMOVE_CONF -> update_config(SchemaModule, ConfKeyPath, UpdateArgs) ->
gen_server:call(?MODULE, {change_config, SchemaModule, ConfKeyPath, UpdateReq}). gen_server:call(?MODULE, {change_config, SchemaModule, ConfKeyPath, UpdateArgs}).
-spec remove_config(module(), emqx_config:config_key_path()) ->
ok | {error, term()}.
remove_config(SchemaModule, ConfKeyPath) ->
gen_server:call(?MODULE, {change_config, SchemaModule, ConfKeyPath, ?REMOVE_CONF}).
-spec add_handler(emqx_config:config_key_path(), handler_name()) -> ok. -spec add_handler(emqx_config:config_key_path(), handler_name()) -> ok.
add_handler(ConfKeyPath, HandlerName) -> add_handler(ConfKeyPath, HandlerName) ->
@ -86,15 +80,15 @@ handle_call({add_child, ConfKeyPath, HandlerName}, _From,
{reply, ok, State#{handlers => {reply, ok, State#{handlers =>
emqx_map_lib:deep_put(ConfKeyPath, Handlers, #{?MOD => HandlerName})}}; emqx_map_lib:deep_put(ConfKeyPath, Handlers, #{?MOD => HandlerName})}};
handle_call({change_config, SchemaModule, ConfKeyPath, UpdateReq}, _From, handle_call({change_config, SchemaModule, ConfKeyPath, UpdateArgs}, _From,
#{handlers := Handlers} = State) -> #{handlers := Handlers} = State) ->
OldConf = emqx_config:get_root(ConfKeyPath), OldConf = emqx_config:get_root(ConfKeyPath),
OldRawConf = emqx_config:get_root_raw(ConfKeyPath), OldRawConf = emqx_config:get_root_raw(ConfKeyPath),
Result = try Result = try
{NewRawConf, OverrideConf} = process_upadate_request(ConfKeyPath, OldRawConf, {NewRawConf, OverrideConf} = process_upadate_request(ConfKeyPath, OldRawConf,
Handlers, UpdateReq), Handlers, UpdateArgs),
{AppEnvs, CheckedConf} = emqx_config:check_config(SchemaModule, NewRawConf), {AppEnvs, CheckedConf} = emqx_config:check_config(SchemaModule, NewRawConf),
_ = do_post_config_update(ConfKeyPath, Handlers, OldConf, CheckedConf, UpdateReq), _ = do_post_config_update(ConfKeyPath, Handlers, OldConf, CheckedConf, UpdateArgs),
emqx_config:save_configs(AppEnvs, CheckedConf, NewRawConf, OverrideConf) emqx_config:save_configs(AppEnvs, CheckedConf, NewRawConf, OverrideConf)
catch Error:Reason:ST -> catch Error:Reason:ST ->
?LOG(error, "change_config failed: ~p", [{Error, Reason, ST}]), ?LOG(error, "change_config failed: ~p", [{Error, Reason, ST}]),
@ -118,12 +112,12 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
process_upadate_request(ConfKeyPath, OldRawConf, _Handlers, ?REMOVE_CONF) -> process_upadate_request(ConfKeyPath, OldRawConf, _Handlers, remove) ->
BinKeyPath = bin_path(ConfKeyPath), BinKeyPath = bin_path(ConfKeyPath),
NewRawConf = emqx_map_lib:deep_remove(BinKeyPath, OldRawConf), NewRawConf = emqx_map_lib:deep_remove(BinKeyPath, OldRawConf),
OverrideConf = emqx_map_lib:deep_remove(BinKeyPath, emqx_config:read_override_conf()), OverrideConf = emqx_map_lib:deep_remove(BinKeyPath, emqx_config:read_override_conf()),
{NewRawConf, OverrideConf}; {NewRawConf, OverrideConf};
process_upadate_request(ConfKeyPath, OldRawConf, Handlers, UpdateReq) -> process_upadate_request(ConfKeyPath, OldRawConf, Handlers, {update, UpdateReq}) ->
NewRawConf = do_update_config(ConfKeyPath, Handlers, OldRawConf, UpdateReq), NewRawConf = do_update_config(ConfKeyPath, Handlers, OldRawConf, UpdateReq),
OverrideConf = update_override_config(NewRawConf), OverrideConf = update_override_config(NewRawConf),
{NewRawConf, OverrideConf}. {NewRawConf, OverrideConf}.
@ -136,14 +130,14 @@ do_update_config([ConfKey | ConfKeyPath], Handlers, OldRawConf, UpdateReq) ->
NewUpdateReq = do_update_config(ConfKeyPath, SubHandlers, SubOldRawConf, UpdateReq), NewUpdateReq = do_update_config(ConfKeyPath, SubHandlers, SubOldRawConf, UpdateReq),
call_pre_config_update(Handlers, OldRawConf, #{bin(ConfKey) => NewUpdateReq}). call_pre_config_update(Handlers, OldRawConf, #{bin(ConfKey) => NewUpdateReq}).
do_post_config_update([], Handlers, OldConf, NewConf, UpdateReq) -> do_post_config_update([], Handlers, OldConf, NewConf, UpdateArgs) ->
call_post_config_update(Handlers, OldConf, NewConf, UpdateReq); call_post_config_update(Handlers, OldConf, NewConf, up_req(UpdateArgs));
do_post_config_update([ConfKey | ConfKeyPath], Handlers, OldConf, NewConf, UpdateReq) -> do_post_config_update([ConfKey | ConfKeyPath], Handlers, OldConf, NewConf, UpdateArgs) ->
SubOldConf = get_sub_config(ConfKey, OldConf), SubOldConf = get_sub_config(ConfKey, OldConf),
SubNewConf = get_sub_config(ConfKey, NewConf), SubNewConf = get_sub_config(ConfKey, NewConf),
SubHandlers = maps:get(ConfKey, Handlers, #{}), SubHandlers = maps:get(ConfKey, Handlers, #{}),
_ = do_post_config_update(ConfKeyPath, SubHandlers, SubOldConf, SubNewConf, UpdateReq), _ = do_post_config_update(ConfKeyPath, SubHandlers, SubOldConf, SubNewConf, UpdateArgs),
call_post_config_update(Handlers, OldConf, NewConf, UpdateReq). call_post_config_update(Handlers, OldConf, NewConf, up_req(UpdateArgs)).
get_sub_config(ConfKey, Conf) when is_map(Conf) -> get_sub_config(ConfKey, Conf) when is_map(Conf) ->
maps:get(ConfKey, Conf, undefined); maps:get(ConfKey, Conf, undefined);
@ -178,6 +172,9 @@ update_override_config(RawConf) ->
OldConf = emqx_config:read_override_conf(), OldConf = emqx_config:read_override_conf(),
maps:merge(OldConf, RawConf). maps:merge(OldConf, RawConf).
up_req(remove) -> '$remove';
up_req({update, Req}) -> Req.
bin_path(ConfKeyPath) -> [bin(Key) || Key <- ConfKeyPath]. bin_path(ConfKeyPath) -> [bin(Key) || Key <- ConfKeyPath].
bin(A) when is_atom(A) -> atom_to_binary(A, utf8); bin(A) when is_atom(A) -> atom_to_binary(A, utf8);

View File

@ -20,11 +20,17 @@
-include("emqx_authz.hrl"). -include("emqx_authz.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-ifdef(TEST).
-compile(export_all).
-compile(nowarn_export_all).
-endif.
-export([ register_metrics/0 -export([ register_metrics/0
, init/0 , init/0
, init_rule/1 , init_rule/1
, lookup/0 , lookup/0
, lookup/1
, move/2
, update/2 , update/2
, authorize/5 , authorize/5
, match/4 , match/4
@ -41,42 +47,160 @@ register_metrics() ->
init() -> init() ->
ok = register_metrics(), ok = register_metrics(),
emqx_config_handler:add_handler(?CONF_KEY_PATH, ?MODULE), emqx_config_handler:add_handler(?CONF_KEY_PATH, ?MODULE),
NRules = [init_rule(Rule) || Rule <- lookup()], NRules = [init_rule(Rule) || Rule <- emqx_config:get(?CONF_KEY_PATH, [])],
ok = emqx_hooks:add('client.authorize', {?MODULE, authorize, [NRules]}, -1). ok = emqx_hooks:add('client.authorize', {?MODULE, authorize, [NRules]}, -1).
lookup() -> lookup() ->
emqx_config:get(?CONF_KEY_PATH, []). {_M, _F, [A]}= find_action_in_hooks(),
A.
lookup(Id) ->
try find_rule_by_id(Id, lookup()) of
{_, Rule} -> Rule
catch
error:Reason -> {error, Reason}
end.
move(Id, Position) ->
emqx_config:update(emqx_authz_schema, ?CONF_KEY_PATH, {move, Id, Position}).
update(Cmd, Rules) -> update(Cmd, Rules) ->
emqx_config:update(emqx_authz_schema, ?CONF_KEY_PATH, {Cmd, Rules}). emqx_config:update(emqx_authz_schema, ?CONF_KEY_PATH, {Cmd, Rules}).
%% For now we only support re-creating the entire rule list pre_config_update({move, Id, <<"top">>}, Conf) when is_list(Conf) ->
pre_config_update({head, Rule}, OldConf) when is_map(Rule), is_list(OldConf) -> {Index, _} = find_rule_by_id(Id),
[Rule | OldConf]; {List1, List2} = lists:split(Index, Conf),
pre_config_update({tail, Rule}, OldConf) when is_map(Rule), is_list(OldConf) -> [lists:nth(Index, Conf)] ++ lists:droplast(List1) ++ List2;
OldConf ++ [Rule];
pre_config_update({_, NewConf}, _OldConf) ->
%% overwrite the entire config!
case is_list(NewConf) of
true -> NewConf;
false -> [NewConf]
end.
post_config_update(_, undefined, _OldConf) -> pre_config_update({move, Id, <<"bottom">>}, Conf) when is_list(Conf) ->
%_ = [release_rules(Rule) || Rule <- OldConf], {Index, _} = find_rule_by_id(Id),
{List1, List2} = lists:split(Index, Conf),
lists:droplast(List1) ++ List2 ++ [lists:nth(Index, Conf)];
pre_config_update({move, Id, #{<<"before">> := BeforeId}}, Conf) when is_list(Conf) ->
{Index1, _} = find_rule_by_id(Id),
Conf1 = lists:nth(Index1, Conf),
{Index2, _} = find_rule_by_id(BeforeId),
Conf2 = lists:nth(Index2, Conf),
{List1, List2} = lists:split(Index2, Conf),
lists:delete(Conf1, lists:droplast(List1))
++ [Conf1] ++ [Conf2]
++ lists:delete(Conf1, List2);
pre_config_update({move, Id, #{<<"after">> := AfterId}}, Conf) when is_list(Conf) ->
{Index1, _} = find_rule_by_id(Id),
Conf1 = lists:nth(Index1, Conf),
{Index2, _} = find_rule_by_id(AfterId),
{List1, List2} = lists:split(Index2, Conf),
lists:delete(Conf1, List1)
++ [Conf1]
++ lists:delete(Conf1, List2);
pre_config_update({head, Rules}, Conf) when is_list(Rules), is_list(Conf) ->
Rules ++ Conf;
pre_config_update({tail, Rules}, Conf) when is_list(Rules), is_list(Conf) ->
Conf ++ Rules;
pre_config_update({{replace_once, Id}, Rule}, Conf) when is_map(Rule), is_list(Conf) ->
{Index, _} = find_rule_by_id(Id),
{List1, List2} = lists:split(Index, Conf),
lists:droplast(List1) ++ [Rule] ++ List2;
pre_config_update({_, Rules}, _Conf) when is_list(Rules)->
%% overwrite the entire config!
Rules.
post_config_update(_, undefined, _Conf) ->
ok; ok;
post_config_update({move, Id, <<"top">>}, _NewRules, _OldRules) ->
InitedRules = lookup(),
{Index, Rule} = find_rule_by_id(Id, InitedRules),
{Rules1, Rules2 } = lists:split(Index, InitedRules),
Rules3 = [Rule] ++ lists:droplast(Rules1) ++ Rules2,
ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [Rules3]}, -1),
ok = emqx_authz_cache:drain_cache();
post_config_update({move, Id, <<"bottom">>}, _NewRules, _OldRules) ->
InitedRules = lookup(),
{Index, Rule} = find_rule_by_id(Id, InitedRules),
{Rules1, Rules2 } = lists:split(Index, InitedRules),
Rules3 = lists:droplast(Rules1) ++ Rules2 ++ [Rule],
ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [Rules3]}, -1),
ok = emqx_authz_cache:drain_cache();
post_config_update({move, Id, #{<<"before">> := BeforeId}}, _NewRules, _OldRules) ->
InitedRules = lookup(),
{_, Rule0} = find_rule_by_id(Id, InitedRules),
{Index, Rule1} = find_rule_by_id(BeforeId, InitedRules),
{Rules1, Rules2} = lists:split(Index, InitedRules),
Rules3 = lists:delete(Rule0, lists:droplast(Rules1))
++ [Rule0] ++ [Rule1]
++ lists:delete(Rule0, Rules2),
ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [Rules3]}, -1),
ok = emqx_authz_cache:drain_cache();
post_config_update({move, Id, #{<<"after">> := AfterId}}, _NewRules, _OldRules) ->
InitedRules = lookup(),
{_, Rule} = find_rule_by_id(Id, InitedRules),
{Index, _} = find_rule_by_id(AfterId, InitedRules),
{Rules1, Rules2} = lists:split(Index, InitedRules),
Rules3 = lists:delete(Rule, Rules1)
++ [Rule]
++ lists:delete(Rule, Rules2),
ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [Rules3]}, -1),
ok = emqx_authz_cache:drain_cache();
post_config_update({head, Rules}, _NewRules, _OldConf) ->
InitedRules = [init_rule(R) || R <- check_rules(Rules)],
ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [InitedRules ++ lookup()]}, -1),
ok = emqx_authz_cache:drain_cache();
post_config_update({tail, Rules}, _NewRules, _OldConf) ->
InitedRules = [init_rule(R) || R <- check_rules(Rules)],
emqx_hooks:put('client.authorize', {?MODULE, authorize, [lookup() ++ InitedRules]}, -1),
ok = emqx_authz_cache:drain_cache();
post_config_update({{replace_once, Id}, Rule}, _NewRules, _OldConf) when is_map(Rule) ->
OldInitedRules = lookup(),
{Index, OldRule} = find_rule_by_id(Id, OldInitedRules),
case maps:get(type, OldRule, undefined) of
undefined -> ok;
_ ->
#{annotations := #{id := Id}} = OldRule,
ok = emqx_resource:remove(Id)
end,
{OldRules1, OldRules2 } = lists:split(Index, OldInitedRules),
InitedRules = [init_rule(R#{annotations => #{id => Id}}) || R <- check_rules([Rule])],
ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [lists:droplast(OldRules1) ++ InitedRules ++ OldRules2]}, -1),
ok = emqx_authz_cache:drain_cache();
post_config_update(_, NewRules, _OldConf) -> post_config_update(_, NewRules, _OldConf) ->
%_ = [release_rules(Rule) || Rule <- OldConf], %% overwrite the entire config!
OldInitedRules = lookup(),
InitedRules = [init_rule(Rule) || Rule <- NewRules], InitedRules = [init_rule(Rule) || Rule <- NewRules],
Action = find_action_in_hooks(), ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [InitedRules]}, -1),
ok = emqx_hooks:del('client.authorize', Action), lists:foreach(fun (#{type := _Type, enable := true, annotations := #{id := Id}}) ->
ok = emqx_hooks:add('client.authorize', {?MODULE, authorize, [InitedRules]}, -1), ok = emqx_resource:remove(Id);
(_) -> ok
end, OldInitedRules),
ok = emqx_authz_cache:drain_cache(). ok = emqx_authz_cache:drain_cache().
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal functions %% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
check_rules(RawRules) ->
{ok, Conf} = hocon:binary(jsx:encode(#{<<"authorization">> => #{<<"rules">> => RawRules}}), #{format => richmap}),
CheckConf = hocon_schema:check(emqx_authz_schema, Conf, #{atom_key => true}),
#{authorization := #{rules := Rules}} = hocon_schema:richmap_to_map(CheckConf),
Rules.
find_rule_by_id(Id) -> find_rule_by_id(Id, lookup()).
find_rule_by_id(Id, Rules) -> find_rule_by_id(Id, Rules, 1).
find_rule_by_id(_RuleId, [], _N) -> error(not_found_rule);
find_rule_by_id(RuleId, [ Rule = #{annotations := #{id := Id}} | Tail], N) ->
case RuleId =:= Id of
true -> {N, Rule};
false -> find_rule_by_id(RuleId, Tail, N + 1)
end.
find_action_in_hooks() -> find_action_in_hooks() ->
Callbacks = emqx_hooks:lookup('client.authorize'), Callbacks = emqx_hooks:lookup('client.authorize'),
[Action] = [Action || {callback,{?MODULE, authorize, _} = Action, _, _} <- Callbacks ], [Action] = [Action || {callback,{?MODULE, authorize, _} = Action, _, _} <- Callbacks ],
@ -85,6 +209,19 @@ find_action_in_hooks() ->
gen_id(Type) -> gen_id(Type) ->
iolist_to_binary([io_lib:format("~s_~s",[?APP, Type]), "_", integer_to_list(erlang:system_time())]). iolist_to_binary([io_lib:format("~s_~s",[?APP, Type]), "_", integer_to_list(erlang:system_time())]).
create_resource(#{type := DB,
config := Config,
annotations := #{id := ResourceID}}) ->
case emqx_resource:update(
ResourceID,
list_to_existing_atom(io_lib:format("~s_~s",[emqx_connector, DB])),
Config,
[])
of
{ok, _} -> ResourceID;
{error, already_created} -> ResourceID;
{error, Reason} -> {error, Reason}
end;
create_resource(#{type := DB, create_resource(#{type := DB,
config := Config}) -> config := Config}) ->
ResourceID = gen_id(DB), ResourceID = gen_id(DB),
@ -102,13 +239,19 @@ create_resource(#{type := DB,
init_rule(#{topics := Topics, init_rule(#{topics := Topics,
action := Action, action := Action,
permission := Permission, permission := Permission,
principal := Principal principal := Principal,
annotations := #{id := Id}
} = Rule) when ?ALLOW_DENY(Permission), ?PUBSUB(Action), is_list(Topics) -> } = Rule) when ?ALLOW_DENY(Permission), ?PUBSUB(Action), is_list(Topics) ->
Rule#{annotations => Rule#{annotations =>
#{id => gen_id(simple), #{id => Id,
principal => compile_principal(Principal), principal => compile_principal(Principal),
topics => [compile_topic(Topic) || Topic <- Topics]} topics => [compile_topic(Topic) || Topic <- Topics]}
}; };
init_rule(#{topics := Topics,
action := Action,
permission := Permission
} = Rule) when ?ALLOW_DENY(Permission), ?PUBSUB(Action), is_list(Topics) ->
init_rule(Rule#{annotations =>#{id => gen_id(simple)}});
init_rule(#{principal := Principal, init_rule(#{principal := Principal,
enable := true, enable := true,

View File

@ -16,74 +16,502 @@
-module(emqx_authz_api). -module(emqx_authz_api).
-behavior(minirest_api).
-include("emqx_authz.hrl"). -include("emqx_authz.hrl").
-rest_api(#{name => lookup_authz, -define(EXAMPLE_RETURNED_RULE1,
method => 'GET', #{principal => <<"all">>,
path => "/authz", permission => <<"allow">>,
func => lookup_authz, action => <<"all">>,
descr => "Lookup Authorization" topics => [<<"#">>],
annotations => #{id => 1}
}). }).
-rest_api(#{name => update_authz,
method => 'PUT', -define(EXAMPLE_RETURNED_RULES,
path => "/authz", #{rules => [?EXAMPLE_RETURNED_RULE1
func => update_authz, ]
descr => "Rewrite authz list"
}). }).
-rest_api(#{name => append_authz, -define(EXAMPLE_RULE1, #{principal => <<"all">>,
method => 'POST', permission => <<"allow">>,
path => "/authz/append", action => <<"all">>,
func => append_authz, topics => [<<"#">>]}).
descr => "Add a new rule at the end of the authz list"
}).
-rest_api(#{name => push_authz, -export([ api_spec/0
method => 'POST', , rules/2
path => "/authz/push", , rule/2
func => push_authz, , move_rule/2
descr => "Add a new rule at the start of the authz list"
}).
-export([ lookup_authz/2
, update_authz/2
, append_authz/2
, push_authz/2
]). ]).
lookup_authz(_Bindings, _Params) -> api_spec() ->
return({ok, emqx_authz:lookup()}). {[ rules_api()
, rule_api()
, move_rule_api()
], definitions()}.
update_authz(_Bindings, Params) -> definitions() -> emqx_authz_api_schema:definitions().
Rules = form_rules(Params),
return(emqx_authz:update(replace, Rules)).
append_authz(_Bindings, Params) -> rules_api() ->
Rules = form_rules(Params), Metadata = #{
return(emqx_authz:update(tail, Rules)). get => #{
description => "List authorization rules",
parameters => [
#{
name => page,
in => query,
schema => #{
type => integer
},
required => false
},
#{
name => limit,
in => query,
schema => #{
type => integer
},
required => false
}
],
responses => #{
<<"200">> => #{
description => <<"OK">>,
content => #{
'application/json' => #{
schema => #{
type => object,
required => [rules],
properties => #{rules => #{
type => array,
items => minirest:ref(<<"returned_rules">>)
}
}
},
examples => #{
rules => #{
summary => <<"Rules">>,
value => jsx:encode(?EXAMPLE_RETURNED_RULES)
}
}
}
}
}
}
},
post => #{
description => "Add new rule",
requestBody => #{
content => #{
'application/json' => #{
schema => minirest:ref(<<"rules">>),
examples => #{
simple_rule => #{
summary => <<"Rules">>,
value => jsx:encode(?EXAMPLE_RULE1)
}
}
}
}
},
responses => #{
<<"204">> => #{description => <<"Created">>},
<<"400">> => #{
description => <<"Bad Request">>,
content => #{
'application/json' => #{
schema => minirest:ref(<<"error">>),
examples => #{
example1 => #{
summary => <<"Bad Request">>,
value => #{
code => <<"BAD_REQUEST">>,
message => <<"Bad Request">>
}
}
}
}
}
}
}
},
put => #{
push_authz(_Bindings, Params) -> description => "Update all rules",
Rules = form_rules(Params), requestBody => #{
return(emqx_authz:update(head, Rules)). content => #{
'application/json' => #{
schema => #{
type => array,
items => minirest:ref(<<"returned_rules">>)
},
examples => #{
rules => #{
summary => <<"Rules">>,
value => jsx:encode([?EXAMPLE_RULE1])
}
}
}
}
},
responses => #{
<<"204">> => #{description => <<"Created">>},
<<"400">> => #{
description => <<"Bad Request">>,
content => #{
'application/json' => #{
schema => minirest:ref(<<"error">>),
examples => #{
example1 => #{
summary => <<"Bad Request">>,
value => #{
code => <<"BAD_REQUEST">>,
message => <<"Bad Request">>
}
}
}
}
}
}
}
}
},
{"/authorization", Metadata, rules}.
%%------------------------------------------------------------------------------ rule_api() ->
%% Interval Funcs Metadata = #{
%%------------------------------------------------------------------------------ get => #{
description => "List authorization rules",
parameters => [
#{
name => id,
in => path,
schema => #{
type => string
},
required => true
}
],
responses => #{
<<"200">> => #{
description => <<"OK">>,
content => #{
'application/json' => #{
schema => minirest:ref(<<"returned_rules">>),
examples => #{
rules => #{
summary => <<"Rules">>,
value => jsx:encode(?EXAMPLE_RETURNED_RULE1)
}
}
}
}
},
<<"404">> => #{
description => <<"Bad Request">>,
content => #{
'application/json' => #{
schema => minirest:ref(<<"error">>),
examples => #{
example1 => #{
summary => <<"Not Found">>,
value => #{
code => <<"NOT_FOUND">>,
message => <<"rule xxx not found">>
}
}
}
}
}
}
}
},
put => #{
description => "Update rule",
parameters => [
#{
name => id,
in => path,
schema => #{
type => string
},
required => true
}
],
requestBody => #{
content => #{
'application/json' => #{
schema => minirest:ref(<<"rules">>),
examples => #{
simple_rule => #{
summary => <<"Rules">>,
value => jsx:encode(?EXAMPLE_RULE1)
}
}
}
}
},
responses => #{
<<"204">> => #{description => <<"No Content">>},
<<"404">> => #{
description => <<"Bad Request">>,
content => #{
'application/json' => #{
schema => minirest:ref(<<"error">>),
examples => #{
example1 => #{
summary => <<"Not Found">>,
value => #{
code => <<"NOT_FOUND">>,
message => <<"rule xxx not found">>
}
}
}
}
}
},
<<"400">> => #{
description => <<"Bad Request">>,
content => #{
'application/json' => #{
schema => minirest:ref(<<"error">>),
examples => #{
example1 => #{
summary => <<"Bad Request">>,
value => #{
code => <<"BAD_REQUEST">>,
message => <<"Bad Request">>
}
}
}
}
}
}
}
},
delete => #{
description => "Delete rule",
parameters => [
#{
name => id,
in => path,
schema => #{
type => string
},
required => true
}
],
responses => #{
<<"204">> => #{description => <<"No Content">>},
<<"400">> => #{
description => <<"Bad Request">>,
content => #{
'application/json' => #{
schema => minirest:ref(<<"error">>),
examples => #{
example1 => #{
summary => <<"Bad Request">>,
value => #{
code => <<"BAD_REQUEST">>,
message => <<"Bad Request">>
}
}
}
}
}
}
}
}
},
{"/authorization/:id", Metadata, rule}.
form_rules(Params) -> move_rule_api() ->
Params. Metadata = #{
post => #{
description => "Change the order of rules",
parameters => [
#{
name => id,
in => path,
schema => #{
type => string
},
required => true
}
],
requestBody => #{
content => #{
'application/json' => #{
schema => #{
type => object,
required => [position],
properties => #{
position => #{
oneOf => [
#{type => string,
enum => [<<"top">>, <<"bottom">>]
},
#{type => object,
required => ['after'],
properties => #{
'after' => #{
type => string
}
}
},
#{type => object,
required => ['before'],
properties => #{
'before' => #{
type => string
}
}
}
]
}
}
}
}
}
},
responses => #{
<<"204">> => #{
description => <<"No Content">>
},
<<"404">> => #{
description => <<"Bad Request">>,
content => #{
'application/json' => #{
schema => minirest:ref(<<"error">>),
examples => #{
example1 => #{
summary => <<"Not Found">>,
value => #{
code => <<"NOT_FOUND">>,
message => <<"rule xxx not found">>
}
}
}
}
}
},
<<"400">> => #{
description => <<"Bad Request">>,
content => #{
'application/json' => #{
schema => minirest:ref(<<"error">>),
examples => #{
example1 => #{
summary => <<"Bad Request">>,
value => #{
code => <<"BAD_REQUEST">>,
message => <<"Bad Request">>
}
}
}
}
}
}
}
}
},
{"/authorization/:id/move", Metadata, move_rule}.
%%-------------------------------------------------------------------- rules(get, Request) ->
%% EUnits Rules = lists:foldl(fun (#{type := _Type, enable := true, annotations := #{id := Id} = Annotations} = Rule, AccIn) ->
%%-------------------------------------------------------------------- NRule = case emqx_resource:health_check(Id) of
ok ->
Rule#{annotations => Annotations#{status => healthy}};
_ ->
Rule#{annotations => Annotations#{status => unhealthy}}
end,
lists:append(AccIn, [NRule]);
(Rule, AccIn) ->
lists:append(AccIn, [Rule])
end, [], emqx_authz:lookup()),
Query = cowboy_req:parse_qs(Request),
case lists:keymember(<<"page">>, 1, Query) andalso lists:keymember(<<"limit">>, 1, Query) of
true ->
{<<"page">>, Page} = lists:keyfind(<<"page">>, 1, Query),
{<<"limit">>, Limit} = lists:keyfind(<<"limit">>, 1, Query),
Index = (binary_to_integer(Page) - 1) * binary_to_integer(Limit),
{_, Rules1} = lists:split(Index, Rules),
case binary_to_integer(Limit) < length(Rules1) of
true ->
{Rules2, _} = lists:split(binary_to_integer(Limit), Rules1),
{200, #{rules => Rules2}};
false -> {200, #{rules => Rules1}}
end;
false -> {200, #{rules => Rules}}
end;
rules(post, Request) ->
{ok, Body, _} = cowboy_req:read_body(Request),
RawConfig = jsx:decode(Body, [return_maps]),
case emqx_authz:update(head, [RawConfig]) of
ok -> {204};
{error, Reason} ->
{400, #{code => <<"BAD_REQUEST">>,
messgae => atom_to_binary(Reason)}}
end;
rules(put, Request) ->
{ok, Body, _} = cowboy_req:read_body(Request),
RawConfig = jsx:decode(Body, [return_maps]),
case emqx_authz:update(replace, RawConfig) of
ok -> {204};
{error, Reason} ->
{400, #{code => <<"BAD_REQUEST">>,
messgae => atom_to_binary(Reason)}}
end.
-ifdef(TEST). rule(get, Request) ->
-include_lib("eunit/include/eunit.hrl"). Id = cowboy_req:binding(id, Request),
case emqx_authz:lookup(Id) of
{error, Reason} -> {404, #{messgae => atom_to_binary(Reason)}};
Rule ->
case maps:get(type, Rule, undefined) of
undefined -> {200, Rule};
_ ->
case emqx_resource:health_check(Id) of
ok ->
{200, Rule#{annotations => #{status => healthy}}};
_ ->
{200, Rule#{annotations => #{status => unhealthy}}}
end
end
-endif. end;
rule(put, Request) ->
return(_) -> RuleId = cowboy_req:binding(id, Request),
%% TODO: V5 api {ok, Body, _} = cowboy_req:read_body(Request),
ok. RawConfig = jsx:decode(Body, [return_maps]),
case emqx_authz:update({replace_once, RuleId}, RawConfig) of
ok -> {204};
{error, not_found_rule} ->
{404, #{code => <<"NOT_FOUND">>,
messgae => <<"rule ", RuleId/binary, " not found">>}};
{error, Reason} ->
{400, #{code => <<"BAD_REQUEST">>,
messgae => atom_to_binary(Reason)}}
end;
rule(delete, Request) ->
RuleId = cowboy_req:binding(id, Request),
case emqx_authz:update({replace_once, RuleId}, #{}) of
ok -> {204};
{error, Reason} ->
{400, #{code => <<"BAD_REQUEST">>,
messgae => atom_to_binary(Reason)}}
end.
move_rule(post, Request) ->
RuleId = cowboy_req:binding(id, Request),
{ok, Body, _} = cowboy_req:read_body(Request),
#{<<"position">> := Position} = jsx:decode(Body, [return_maps]),
case emqx_authz:move(RuleId, Position) of
ok -> {204};
{error, not_found_rule} ->
{404, #{code => <<"NOT_FOUND">>,
messgae => <<"rule ", RuleId/binary, " not found">>}};
{error, Reason} ->
{400, #{code => <<"BAD_REQUEST">>,
messgae => atom_to_binary(Reason)}}
end.

View File

@ -0,0 +1,157 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_authz_api_schema).
-export([definitions/0]).
definitions() ->
RetruenedRules = #{
allOf => [ #{type => object,
properties => #{
annotations => #{
type => object,
required => [id],
properties => #{
id => #{
type => string
},
principal => minirest:ref(<<"principal">>)
}
}
}
}
, minirest:ref(<<"rules">>)
]
},
Rules = #{
oneOf => [ minirest:ref(<<"simple_rule">>)
% , minirest:ref(<<"connector_redis">>)
]
},
% ConnectorRedis = #{
% type => object,
% required => [principal, type, enable, config, cmd]
% properties => #{
% principal => minirest:ref(<<"principal">>),
% type => #{
% type => string,
% enum => [<<"redis">>],
% example => <<"redis">>
% },
% enable => #{
% type => boolean,
% example => true
% }
% config => #{
% type =>
% }
% }
% }
SimpleRule = #{
type => object,
required => [principal, permission, action, topics],
properties => #{
action => #{
type => string,
enum => [<<"publish">>, <<"subscribe">>, <<"all">>],
example => <<"publish">>
},
permission => #{
type => string,
enum => [<<"allow">>, <<"deny">>],
example => <<"allow">>
},
topics => #{
type => array,
items => #{
oneOf => [ #{type => string, example => <<"#">>}
, #{type => object,
required => [eq],
properties => #{
eq => #{type => string}
},
example => #{eq => <<"#">>}
}
]
}
},
principal => minirest:ref(<<"principal">>)
}
},
Principal = #{
oneOf => [ minirest:ref(<<"principal_username">>)
, minirest:ref(<<"principal_clientid">>)
, minirest:ref(<<"principal_ipaddress">>)
, #{type => string, enum=>[<<"all">>], example => <<"all">>}
, #{type => object,
required => ['and'],
properties => #{'and' => #{type => array,
items => #{oneOf => [ minirest:ref(<<"principal_username">>)
, minirest:ref(<<"principal_clientid">>)
, minirest:ref(<<"principal_ipaddress">>)
]}}},
example => #{'and' => [#{username => <<"emqx">>}, #{clientid => <<"emqx">>}]}
}
, #{type => object,
required => ['or'],
properties => #{'and' => #{type => array,
items => #{oneOf => [ minirest:ref(<<"principal_username">>)
, minirest:ref(<<"principal_clientid">>)
, minirest:ref(<<"principal_ipaddress">>)
]}}},
example => #{'or' => [#{username => <<"emqx">>}, #{clientid => <<"emqx">>}]}
}
]
},
PrincipalUsername = #{type => object,
required => [username],
properties => #{username => #{type => string}},
example => #{username => <<"emqx">>}
},
PrincipalClientid = #{type => object,
required => [clientid],
properties => #{clientid => #{type => string}},
example => #{clientid => <<"emqx">>}
},
PrincipalIpaddress = #{type => object,
required => [ipaddress],
properties => #{ipaddress => #{type => string}},
example => #{ipaddress => <<"127.0.0.1">>}
},
ErrorDef = #{
type => object,
properties => #{
code => #{
type => string,
example => <<"BAD_REQUEST">>
},
message => #{
type => string
}
}
},
[ #{<<"returned_rules">> => RetruenedRules}
, #{<<"rules">> => Rules}
, #{<<"simple_rule">> => SimpleRule}
, #{<<"principal">> => Principal}
, #{<<"principal_username">> => PrincipalUsername}
, #{<<"principal_clientid">> => PrincipalClientid}
, #{<<"principal_ipaddress">> => PrincipalIpaddress}
, #{<<"error">> => ErrorDef}
].

View File

@ -22,6 +22,8 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-define(CONF_DEFAULT, <<"authorization: {rules: []}">>).
all() -> all() ->
emqx_ct:all(?MODULE). emqx_ct:all(?MODULE).
@ -29,81 +31,144 @@ groups() ->
[]. [].
init_per_suite(Config) -> init_per_suite(Config) ->
ok = emqx_config:init_load(emqx_authz_schema, ?CONF_DEFAULT),
ok = emqx_ct_helpers:start_apps([emqx_authz]), ok = emqx_ct_helpers:start_apps([emqx_authz]),
ok = emqx_config:update([zones, default, authorization, cache, enable], false), ok = emqx_config:update([zones, default, authorization, cache, enable], false),
ok = emqx_config:update([zones, default, authorization, enable], true), ok = emqx_config:update([zones, default, authorization, enable], true),
emqx_authz:update(replace, []),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([emqx_authz]). ok = emqx_authz:update(replace, []),
emqx_ct_helpers:stop_apps([emqx_authz]),
ok.
-define(RULE1, #{principal => all, init_per_testcase(_, Config) ->
topics => [<<"#">>], ok = emqx_authz:update(replace, []),
action => all, Config.
permission => deny}
-define(RULE1, #{<<"principal">> => <<"all">>,
<<"topics">> => [<<"#">>],
<<"action">> => <<"all">>,
<<"permission">> => <<"deny">>}
). ).
-define(RULE2, #{principal => -define(RULE2, #{<<"principal">> =>
#{ipaddress => <<"127.0.0.1">>}, #{<<"ipaddress">> => <<"127.0.0.1">>},
topics => <<"topics">> =>
[#{eq => <<"#">>}, [#{<<"eq">> => <<"#">>},
#{eq => <<"+">>} #{<<"eq">> => <<"+">>}
] , ] ,
action => all, <<"action">> => <<"all">>,
permission => allow} <<"permission">> => <<"allow">>}
). ).
-define(RULE3,#{principal => -define(RULE3,#{<<"principal">> =>
#{'and' => [#{username => "^test?"}, #{<<"and">> => [#{<<"username">> => <<"^test?">>},
#{clientid => "^test?"} #{<<"clientid">> => <<"^test?">>}
]}, ]},
topics => [<<"test">>], <<"topics">> => [<<"test">>],
action => publish, <<"action">> => <<"publish">>,
permission => allow} <<"permission">> => <<"allow">>}
). ).
-define(RULE4,#{principal => -define(RULE4,#{<<"principal">> =>
#{'or' => [#{username => <<"^test">>}, #{<<"or">> => [#{<<"username">> => <<"^test">>},
#{clientid => <<"test?">>} #{<<"clientid">> => <<"test?">>}
]}, ]},
topics => [<<"%u">>,<<"%c">>], <<"topics">> => [<<"%u">>,<<"%c">>],
action => publish, <<"action">> => <<"publish">>,
permission => deny} <<"permission">> => <<"deny">>}
). ).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Testcases %% Testcases
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
t_init_rule(_) ->
?assertMatch(#{annotations := #{id := _ID, t_update_rule(_) ->
ok = emqx_authz:update(replace, [?RULE2]),
ok = emqx_authz:update(head, [?RULE1]),
ok = emqx_authz:update(tail, [?RULE3]),
Lists1 = emqx_authz:check_rules([?RULE1, ?RULE2, ?RULE3]),
?assertMatch(Lists1, emqx_config:get([authorization, rules], [])),
[#{annotations := #{id := Id1,
principal := all, principal := all,
topics := [['#']]} topics := [['#']]}
}, emqx_authz:init_rule(?RULE1)), },
?assertMatch(#{annotations := #{principal := #{annotations := #{id := Id2,
#{ipaddress := {{127,0,0,1},{127,0,0,1},32}}, principal := #{ipaddress := {{127,0,0,1},{127,0,0,1},32}},
topics := [#{eq := ['#']}, topics := [#{eq := ['#']}, #{eq := ['+']}]}
#{eq := ['+']}], },
id := _ID} #{annotations := #{id := Id3,
}, emqx_authz:init_rule(?RULE2)), principal :=
?assertMatch(#{annotations :=
#{principal :=
#{'and' := [#{username := {re_pattern, _, _, _, _}}, #{'and' := [#{username := {re_pattern, _, _, _, _}},
#{clientid := {re_pattern, _, _, _, _}} #{clientid := {re_pattern, _, _, _, _}}
] ]
}, },
topics := [[<<"test">>]], topics := [[<<"test">>]]}
id := _ID} }
}, emqx_authz:init_rule(?RULE3)), ] = emqx_authz:lookup(),
?assertMatch(#{annotations :=
#{principal := ok = emqx_authz:update({replace_once, Id3}, ?RULE4),
Lists2 = emqx_authz:check_rules([?RULE1, ?RULE2, ?RULE4]),
?assertMatch(Lists2, emqx_config:get([authorization, rules], [])),
[#{annotations := #{id := Id1,
principal := all,
topics := [['#']]}
},
#{annotations := #{id := Id2,
principal := #{ipaddress := {{127,0,0,1},{127,0,0,1},32}},
topics := [#{eq := ['#']},
#{eq := ['+']}]}
},
#{annotations := #{id := Id3,
principal :=
#{'or' := [#{username := {re_pattern, _, _, _, _}}, #{'or' := [#{username := {re_pattern, _, _, _, _}},
#{clientid := {re_pattern, _, _, _, _}} #{clientid := {re_pattern, _, _, _, _}}
] ]
}, },
topics := [#{pattern := [<<"%u">>]}, topics := [#{pattern := [<<"%u">>]},
#{pattern := [<<"%c">>]} #{pattern := [<<"%c">>]}
], ]}
id := _ID} }
}, emqx_authz:init_rule(?RULE4)), ] = emqx_authz:lookup(),
ok = emqx_authz:update(replace, []).
t_move_rule(_) ->
ok = emqx_authz:update(replace, [?RULE1, ?RULE2, ?RULE3, ?RULE4]),
[#{annotations := #{id := Id1}},
#{annotations := #{id := Id2}},
#{annotations := #{id := Id3}},
#{annotations := #{id := Id4}}
] = emqx_authz:lookup(),
ok = emqx_authz:move(Id4, <<"top">>),
?assertMatch([#{annotations := #{id := Id4}},
#{annotations := #{id := Id1}},
#{annotations := #{id := Id2}},
#{annotations := #{id := Id3}}
], emqx_authz:lookup()),
ok = emqx_authz:move(Id1, <<"bottom">>),
?assertMatch([#{annotations := #{id := Id4}},
#{annotations := #{id := Id2}},
#{annotations := #{id := Id3}},
#{annotations := #{id := Id1}}
], emqx_authz:lookup()),
ok = emqx_authz:move(Id3, #{<<"before">> => Id4}),
?assertMatch([#{annotations := #{id := Id3}},
#{annotations := #{id := Id4}},
#{annotations := #{id := Id2}},
#{annotations := #{id := Id1}}
], emqx_authz:lookup()),
ok = emqx_authz:move(Id2, #{<<"after">> => Id1}),
?assertMatch([#{annotations := #{id := Id3}},
#{annotations := #{id := Id4}},
#{annotations := #{id := Id1}},
#{annotations := #{id := Id2}}
], emqx_authz:lookup()),
ok. ok.
t_authz(_) -> t_authz(_) ->
@ -132,10 +197,10 @@ t_authz(_) ->
listener => mqtt_tcp listener => mqtt_tcp
}, },
Rules1 = [emqx_authz:init_rule(Rule) || Rule <- [?RULE1, ?RULE2]], Rules1 = [emqx_authz:init_rule(Rule) || Rule <- emqx_authz:check_rules([?RULE1, ?RULE2])],
Rules2 = [emqx_authz:init_rule(Rule) || Rule <- [?RULE2, ?RULE1]], Rules2 = [emqx_authz:init_rule(Rule) || Rule <- emqx_authz:check_rules([?RULE2, ?RULE1])],
Rules3 = [emqx_authz:init_rule(Rule) || Rule <- [?RULE3, ?RULE4]], Rules3 = [emqx_authz:init_rule(Rule) || Rule <- emqx_authz:check_rules([?RULE3, ?RULE4])],
Rules4 = [emqx_authz:init_rule(Rule) || Rule <- [?RULE4, ?RULE1]], Rules4 = [emqx_authz:init_rule(Rule) || Rule <- emqx_authz:check_rules([?RULE4, ?RULE1])],
?assertEqual({stop, deny}, ?assertEqual({stop, deny},
emqx_authz:authorize(ClientInfo1, subscribe, <<"#">>, deny, [])), emqx_authz:authorize(ClientInfo1, subscribe, <<"#">>, deny, [])),

View File

@ -18,143 +18,207 @@
-compile(nowarn_export_all). -compile(nowarn_export_all).
-compile(export_all). -compile(export_all).
% -include("emqx_authz.hrl"). -include("emqx_authz.hrl").
% -include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
% -include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
% -import(emqx_ct_http, [ request_api/3 -import(emqx_ct_http, [ request_api/3
% , request_api/5 , request_api/5
% , get_http_data/1 , get_http_data/1
% , create_default_app/0 , create_default_app/0
% , delete_default_app/0 , delete_default_app/0
% , default_auth_header/0 , default_auth_header/0
% ]). , auth_header/2
]).
% -define(HOST, "http://127.0.0.1:8081/"). -define(HOST, "http://127.0.0.1:8081/").
% -define(API_VERSION, "v4"). -define(API_VERSION, "v5").
% -define(BASE_PATH, "api"). -define(BASE_PATH, "api").
-define(CONF_DEFAULT, <<""" -define(RULE1, #{<<"principal">> => <<"all">>,
authorization:{ <<"topics">> => [<<"#">>],
rules: [ <<"action">> => <<"all">>,
] <<"permission">> => <<"deny">>}
} ).
""">>). -define(RULE2, #{<<"principal">> =>
#{<<"ipaddress">> => <<"127.0.0.1">>},
<<"topics">> =>
[#{<<"eq">> => <<"#">>},
#{<<"eq">> => <<"+">>}
] ,
<<"action">> => <<"all">>,
<<"permission">> => <<"allow">>}
).
-define(RULE3,#{<<"principal">> =>
#{<<"and">> => [#{<<"username">> => <<"^test?">>},
#{<<"clientid">> => <<"^test?">>}
]},
<<"topics">> => [<<"test">>],
<<"action">> => <<"publish">>,
<<"permission">> => <<"allow">>}
).
-define(RULE4,#{<<"principal">> =>
#{<<"or">> => [#{<<"username">> => <<"^test">>},
#{<<"clientid">> => <<"test?">>}
]},
<<"topics">> => [<<"%u">>,<<"%c">>],
<<"action">> => <<"publish">>,
<<"permission">> => <<"deny">>}
).
all() -> all() ->
%% TODO: V5 API emqx_ct:all(?MODULE).
%% emqx_ct:all(?MODULE).
[t_api_unit_test].
groups() -> groups() ->
[]. [].
init_per_suite(Config) -> init_per_suite(Config) ->
ok = emqx_config:init_load(emqx_authz_schema, ?CONF_DEFAULT), ekka_mnesia:start(),
ok = emqx_ct_helpers:start_apps([emqx_authz]), emqx_mgmt_auth:mnesia(boot),
ok = emqx_ct_helpers:start_apps([emqx_management, emqx_authz], fun set_special_configs/1),
ok = emqx_config:update([zones, default, authorization, cache, enable], false),
ok = emqx_config:update([zones, default, authorization, enable], true),
%create_default_app(),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
ok = emqx_authz:update(replace, []), ok = emqx_authz:update(replace, []),
emqx_ct_helpers:stop_apps([emqx_authz]), emqx_ct_helpers:stop_apps([emqx_authz, emqx_management]),
ok. ok.
% set_special_configs(emqx) -> set_special_configs(emqx_management) ->
% application:set_env(emqx, allow_anonymous, true), emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}],
% application:set_env(emqx, enable_authz_cache, false), applications =>[#{id => "admin", secret => "public"}]}),
% ok; ok;
% set_special_configs(emqx_authz) -> set_special_configs(emqx_authz) ->
% emqx_config:put([emqx_authz], #{rules => []}), emqx_config:put([authorization], #{rules => []}),
% ok; ok;
set_special_configs(_App) ->
ok.
% set_special_configs(emqx_management) -> %%------------------------------------------------------------------------------
% emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}], %% Testcases
% applications =>[#{id => "admin", secret => "public"}]}), %%------------------------------------------------------------------------------
% ok;
% set_special_configs(_App) -> t_api(_) ->
% ok. {ok, 200, Result1} = request(get, uri(["authorization"]), []),
?assertEqual([], get_rules(Result1)),
% %%------------------------------------------------------------------------------ lists:foreach(fun(_) ->
% %% Testcases {ok, 204, _} = request(post, uri(["authorization"]),
% %%------------------------------------------------------------------------------ #{<<"action">> => <<"all">>,
<<"permission">> => <<"deny">>,
<<"principal">> => <<"all">>,
<<"topics">> => [<<"#">>]}
)
end, lists:seq(1, 20)),
{ok, 200, Result2} = request(get, uri(["authorization"]), []),
?assertEqual(20, length(get_rules(Result2))),
t_api_unit_test(_Config) -> lists:foreach(fun(Page) ->
%% TODO: Decode from JSON or HOCON, instead of hand-crafting decode result Query = "?page=" ++ integer_to_list(Page) ++ "&&limit=10",
Rule1 = #{<<"principal">> => Url = uri(["authorization" ++ Query]),
#{<<"and">> => [#{<<"username">> => <<"^test?">>}, {ok, 200, Result} = request(get, Url, []),
#{<<"clientid">> => <<"^test?">>} ?assertEqual(10, length(get_rules(Result)))
]}, end, lists:seq(1, 2)),
<<"action">> => <<"subscribe">>,
<<"topics">> => [<<"%u">>],
<<"permission">> => <<"allow">>
},
ok = emqx_authz_api:push_authz(#{}, Rule1),
[#{action := subscribe,
permission := allow,
principal :=
#{'and' := [#{username := <<"^test?">>},
#{clientid := <<"^test?">>}]},
topics := [<<"%u">>]}] = emqx_config:get([authorization, rules]).
% t_api(_Config) -> {ok, 204, _} = request(put, uri(["authorization"]),
% Rule1 = #{<<"principal">> => [ #{<<"action">> => <<"all">>, <<"permission">> => <<"allow">>, <<"principal">> => <<"all">>, <<"topics">> => [<<"#">>]}
% #{<<"and">> => [#{<<"username">> => <<"^test?">>}, , #{<<"action">> => <<"all">>, <<"permission">> => <<"allow">>, <<"principal">> => <<"all">>, <<"topics">> => [<<"#">>]}
% #{<<"clientid">> => <<"^test?">>} , #{<<"action">> => <<"all">>, <<"permission">> => <<"allow">>, <<"principal">> => <<"all">>, <<"topics">> => [<<"#">>]}
% ]}, ]),
% <<"action">> => <<"subscribe">>,
% <<"topics">> => [<<"%u">>],
% <<"permission">> => <<"allow">>
% },
% {ok, _} = request_http_rest_add(["authz/push"], #{rules => [Rule1]}),
% {ok, Result1} = request_http_rest_lookup(["authz"]),
% ?assertMatch([Rule1 | _ ], get_http_data(Result1)),
% Rule2 = #{<<"principal">> => #{<<"ipaddress">> => <<"127.0.0.1">>}, {ok, 200, Result3} = request(get, uri(["authorization"]), []),
% <<"action">> => <<"publish">>, Rules = get_rules(Result3),
% <<"topics">> => [#{<<"eq">> => <<"#">>}, ?assertEqual(3, length(Rules)),
% #{<<"eq">> => <<"+">>}
% ],
% <<"permission">> => <<"deny">>
% },
% {ok, _} = request_http_rest_add(["authz/append"], #{rules => [Rule2]}),
% {ok, Result2} = request_http_rest_lookup(["authz"]),
% ?assertEqual(Rule2#{<<"principal">> => #{<<"ipaddress">> => "127.0.0.1"}},
% lists:last(get_http_data(Result2))),
% {ok, _} = request_http_rest_update(["authz"], #{rules => []}), lists:foreach(fun(#{<<"permission">> := Allow}) ->
% {ok, Result3} = request_http_rest_lookup(["authz"]), ?assertEqual(<<"allow">>, Allow)
% ?assertEqual([], get_http_data(Result3)), end, Rules),
% ok.
% %%-------------------------------------------------------------------- #{<<"annotations">> := #{<<"id">> := Id}} = lists:nth(2, Rules),
% %% HTTP Request
% %%--------------------------------------------------------------------
% request_http_rest_list(Path) -> {ok, 204, _} = request(put, uri(["authorization", binary_to_list(Id)]),
% request_api(get, uri(Path), default_auth_header()). #{<<"action">> => <<"all">>, <<"permission">> => <<"deny">>,
<<"principal">> => <<"all">>, <<"topics">> => [<<"#">>]}),
% request_http_rest_lookup(Path) -> {ok, 200, Result4} = request(get, uri(["authorization", binary_to_list(Id)]), []),
% request_api(get, uri([Path]), default_auth_header()). ?assertMatch(#{<<"annotations">> := #{<<"id">> := Id},
<<"permission">> := <<"deny">>
}, jsx:decode(Result4)),
% request_http_rest_add(Path, Params) -> lists:foreach(fun(#{<<"annotations">> := #{<<"id">> := Id}}) ->
% request_api(post, uri(Path), [], default_auth_header(), Params). {ok, 204, _} = request(delete, uri(["authorization", binary_to_list(Id)]), [])
end, Rules),
{ok, 200, Result5} = request(get, uri(["authorization"]), []),
?assertEqual([], get_rules(Result5)),
ok.
% request_http_rest_update(Path, Params) -> t_move_rule(_) ->
% request_api(put, uri([Path]), [], default_auth_header(), Params). ok = emqx_authz:update(replace, [?RULE1, ?RULE2, ?RULE3, ?RULE4]),
[#{annotations := #{id := Id1}},
#{annotations := #{id := Id2}},
#{annotations := #{id := Id3}},
#{annotations := #{id := Id4}}
] = emqx_authz:lookup(),
% request_http_rest_delete(Login) -> {ok, 204, _} = request(post, uri(["authorization", Id4, "move"]),
% request_api(delete, uri([Login]), default_auth_header()). #{<<"position">> => <<"top">>}),
?assertMatch([#{annotations := #{id := Id4}},
#{annotations := #{id := Id1}},
#{annotations := #{id := Id2}},
#{annotations := #{id := Id3}}
], emqx_authz:lookup()),
% uri() -> uri([]). {ok, 204, _} = request(post, uri(["authorization", Id1, "move"]),
% uri(Parts) when is_list(Parts) -> #{<<"position">> => <<"bottom">>}),
% NParts = [b2l(E) || E <- Parts], ?assertMatch([#{annotations := #{id := Id4}},
% ?HOST ++ filename:join([?BASE_PATH, ?API_VERSION | NParts]). #{annotations := #{id := Id2}},
#{annotations := #{id := Id3}},
#{annotations := #{id := Id1}}
], emqx_authz:lookup()),
% %% @private {ok, 204, _} = request(post, uri(["authorization", Id3, "move"]),
% b2l(B) when is_binary(B) -> #{<<"position">> => #{<<"before">> => Id4}}),
% binary_to_list(B); ?assertMatch([#{annotations := #{id := Id3}},
% b2l(L) when is_list(L) -> #{annotations := #{id := Id4}},
% L. #{annotations := #{id := Id2}},
#{annotations := #{id := Id1}}
], emqx_authz:lookup()),
{ok, 204, _} = request(post, uri(["authorization", Id2, "move"]),
#{<<"position">> => #{<<"after">> => Id1}}),
?assertMatch([#{annotations := #{id := Id3}},
#{annotations := #{id := Id4}},
#{annotations := #{id := Id1}},
#{annotations := #{id := Id2}}
], emqx_authz:lookup()),
ok.
%%--------------------------------------------------------------------
%% HTTP Request
%%--------------------------------------------------------------------
request(Method, Url, Body) ->
Request = case Body of
[] -> {Url, [auth_header("admin", "public")]};
_ -> {Url, [auth_header("admin", "public")], "application/json", jsx:encode(Body)}
end,
ct:pal("Method: ~p, Request: ~p", [Method, Request]),
case httpc:request(Method, Request, [], [{body_format, binary}]) of
{error, socket_closed_remotely} ->
{error, socket_closed_remotely};
{ok, {{"HTTP/1.1", Code, _}, _Headers, Return} } ->
{ok, Code, Return};
{ok, {Reason, _, _}} ->
{error, Reason}
end.
uri() -> uri([]).
uri(Parts) when is_list(Parts) ->
NParts = [E || E <- Parts],
?HOST ++ filename:join([?BASE_PATH, ?API_VERSION | NParts]).
get_rules(Result) ->
maps:get(<<"rules">>, jsx:decode(Result), []).

View File

@ -31,6 +31,7 @@ groups() ->
init_per_suite(Config) -> init_per_suite(Config) ->
meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]), meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end), meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end),
meck:expect(emqx_resource, remove, fun(_) -> ok end ),
ok = emqx_ct_helpers:start_apps([emqx_authz]), ok = emqx_ct_helpers:start_apps([emqx_authz]),
ok = emqx_config:update([zones, default, authorization, cache, enable], false), ok = emqx_config:update([zones, default, authorization, cache, enable], false),

View File

@ -31,6 +31,7 @@ groups() ->
init_per_suite(Config) -> init_per_suite(Config) ->
meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]), meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end ), meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end ),
meck:expect(emqx_resource, remove, fun(_) -> ok end ),
ok = emqx_ct_helpers:start_apps([emqx_authz]), ok = emqx_ct_helpers:start_apps([emqx_authz]),

View File

@ -31,6 +31,7 @@ groups() ->
init_per_suite(Config) -> init_per_suite(Config) ->
meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]), meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end ), meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end ),
meck:expect(emqx_resource, remove, fun(_) -> ok end ),
ok = emqx_ct_helpers:start_apps([emqx_authz]), ok = emqx_ct_helpers:start_apps([emqx_authz]),

View File

@ -31,6 +31,7 @@ groups() ->
init_per_suite(Config) -> init_per_suite(Config) ->
meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]), meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end ), meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end ),
meck:expect(emqx_resource, remove, fun(_) -> ok end ),
ok = emqx_ct_helpers:start_apps([emqx_authz]), ok = emqx_ct_helpers:start_apps([emqx_authz]),

View File

@ -97,6 +97,14 @@ users_api() ->
responses => #{ responses => #{
<<"200">> => response_array_schema(<<"">>, show_user) <<"200">> => response_array_schema(<<"">>, show_user)
} }
},
post => #{
description => <<"Create dashboard users">>,
'requestBody' => request_body_schema(create_user),
responses => #{
<<"200">> => response_schema(<<"Create Users successfully">>),
<<"400">> => bad_request()
}
} }
}, },
{"/users", Metadata, users}. {"/users", Metadata, users}.
@ -117,7 +125,7 @@ user_api() ->
'requestBody' => request_body_schema(#{ 'requestBody' => request_body_schema(#{
type => object, type => object,
properties => #{ properties => #{
tags => #{ tag => #{
type => string type => string
} }
} }
@ -126,15 +134,6 @@ user_api() ->
<<"200">> => response_schema(<<"Update Users successfully">>), <<"200">> => response_schema(<<"Update Users successfully">>),
<<"400">> => bad_request() <<"400">> => bad_request()
} }
},
post => #{
description => <<"Create dashboard users">>,
parameters => [path_param_username()],
'requestBody' => request_body_schema(create_user),
responses => #{
<<"200">> => response_schema(<<"Create Users successfully">>),
<<"400">> => bad_request()
}
} }
}, },
{"/users/:username", Metadata, user}. {"/users/:username", Metadata, user}.
@ -161,7 +160,7 @@ change_pwd_api() ->
} }
} }
}, },
{"/change_pwd/:username", Metadata, change_pwd}. {"/users/:username/change_pwd", Metadata, change_pwd}.
path_param_username() -> path_param_username() ->
#{ #{
@ -187,14 +186,32 @@ auth(post, Request) ->
end. end.
users(get, _Request) -> users(get, _Request) ->
{200, [row(User) || User <- emqx_dashboard_admin:all_users()]}. {200, [row(User) || User <- emqx_dashboard_admin:all_users()]};
users(post, Request) ->
{ok, Body, _} = cowboy_req:read_body(Request),
Params = emqx_json:decode(Body, [return_maps]),
Tag = maps:get(<<"tag">>, Params),
Username = maps:get(<<"username">>, Params),
Password = maps:get(<<"password">>, Params),
case ?EMPTY(Username) orelse ?EMPTY(Password) of
true ->
{400, #{code => <<"CREATE_USER_FAIL">>,
message => <<"Username or password undefined">>}};
false ->
case emqx_dashboard_admin:add_user(Username, Password, Tag) of
ok -> {200};
{error, Reason} ->
{400, #{code => <<"CREATE_USER_FAIL">>, message => Reason}}
end
end.
user(put, Request) -> user(put, Request) ->
Username = cowboy_req:binding(username, Request), Username = cowboy_req:binding(username, Request),
{ok, Body, _} = cowboy_req:read_body(Request), {ok, Body, _} = cowboy_req:read_body(Request),
Params = emqx_json:decode(Body, [return_maps]), Params = emqx_json:decode(Body, [return_maps]),
Tags = maps:get(<<"tags">>, Params), Tag = maps:get(<<"tag">>, Params),
case emqx_dashboard_admin:update_user(Username, Tags) of case emqx_dashboard_admin:update_user(Username, Tag) of
ok -> {200}; ok -> {200};
{error, Reason} -> {error, Reason} ->
{400, #{code => <<"UPDATE_FAIL">>, message => Reason}} {400, #{code => <<"UPDATE_FAIL">>, message => Reason}}
@ -208,24 +225,6 @@ user(delete, Request) ->
false -> false ->
_ = emqx_dashboard_admin:remove_user(Username), _ = emqx_dashboard_admin:remove_user(Username),
{200} {200}
end;
user(post, Request) ->
{ok, Body, _} = cowboy_req:read_body(Request),
Params = emqx_json:decode(Body, [return_maps]),
Tags = maps:get(<<"tags">>, Params),
Username = maps:get(<<"username">>, Params),
Password = maps:get(<<"password">>, Params),
case ?EMPTY(Username) orelse ?EMPTY(Password) of
true ->
{400, #{code => <<"CREATE_USER_FAIL">>,
message => <<"Username or password undefined">>}};
false ->
case emqx_dashboard_admin:add_user(Username, Password, Tags) of
ok -> {200};
{error, Reason} ->
{400, #{code => <<"CREATE_USER_FAIL">>, message => Reason}}
end
end. end.
change_pwd(put, Request) -> change_pwd(put, Request) ->
@ -240,8 +239,8 @@ change_pwd(put, Request) ->
{400, #{code => <<"CHANGE_PWD_FAIL">>, message => Reason}} {400, #{code => <<"CHANGE_PWD_FAIL">>, message => Reason}}
end. end.
row(#mqtt_admin{username = Username, tags = Tags}) -> row(#mqtt_admin{username = Username, tags = Tag}) ->
#{username => Username, tags => Tags}. #{username => Username, tag => Tag}.
bad_request() -> bad_request() ->
response_schema(<<"Bad Request">>, response_schema(<<"Bad Request">>,

View File

@ -56,7 +56,7 @@ start_link() ->
get_collect() -> gen_server:call(whereis(?MODULE), get_collect). get_collect() -> gen_server:call(whereis(?MODULE), get_collect).
init([]) -> init([]) ->
timer(timer:seconds(interval()), collect), timer(next_interval(), collect),
timer(get_today_remaining_seconds(), clear_expire_data), timer(get_today_remaining_seconds(), clear_expire_data),
ExpireInterval = emqx_config:get([emqx_dashboard, monitor, interval], ?EXPIRE_INTERVAL), ExpireInterval = emqx_config:get([emqx_dashboard, monitor, interval], ?EXPIRE_INTERVAL),
State = #{ State = #{
@ -68,6 +68,15 @@ init([]) ->
}, },
{ok, State}. {ok, State}.
%% @doc every whole interval seconds;
%% example:
%% interval is 10s
%% now 15:01:07 (or 15:07:01 ~ 15:07:10)
%% next will be 15:01:10, 15:01:20, 15:01:30 ...
%% ensure all counters in cluster have sync time
next_interval() ->
(1000 * interval()) - (erlang:system_time(millisecond) rem (1000 * interval())) - 1.
interval() -> interval() ->
emqx_config:get([?APP, sample_interval], ?DEFAULT_INTERVAL). emqx_config:get([?APP, sample_interval], ?DEFAULT_INTERVAL).
@ -82,17 +91,17 @@ handle_cast(_Req, State) ->
{noreply, State}. {noreply, State}.
handle_info(collect, State = #{collect := Collect, count := 1, temp_collect := TempCollect, last_collects := LastCollect}) -> handle_info(collect, State = #{collect := Collect, count := 1, temp_collect := TempCollect, last_collects := LastCollect}) ->
timer(next_interval(), collect),
NewLastCollect = flush(collect_all(Collect), LastCollect), NewLastCollect = flush(collect_all(Collect), LastCollect),
TempCollect1 = temp_collect(TempCollect), TempCollect1 = temp_collect(TempCollect),
timer(timer:seconds(interval()), collect),
{noreply, State#{count => count(), {noreply, State#{count => count(),
collect => ?COLLECT, collect => ?COLLECT,
temp_collect => TempCollect1, temp_collect => TempCollect1,
last_collects => NewLastCollect}}; last_collects => NewLastCollect}};
handle_info(collect, State = #{count := Count, collect := Collect, temp_collect := TempCollect}) -> handle_info(collect, State = #{count := Count, collect := Collect, temp_collect := TempCollect}) ->
timer(next_interval(), collect),
TempCollect1 = temp_collect(TempCollect), TempCollect1 = temp_collect(TempCollect),
timer(timer:seconds(interval()), collect),
{noreply, State#{count => Count - 1, {noreply, State#{count => Count - 1,
collect => collect_all(Collect), collect => collect_all(Collect),
temp_collect => TempCollect1}, hibernate}; temp_collect => TempCollect1}, hibernate};
@ -170,4 +179,4 @@ get_today_remaining_seconds() ->
get_local_time() -> get_local_time() ->
(calendar:datetime_to_gregorian_seconds(calendar:local_time()) - (calendar:datetime_to_gregorian_seconds(calendar:local_time()) -
calendar:datetime_to_gregorian_seconds({{1970,1,1}, {0,0,0}})) * 1000. calendar:datetime_to_gregorian_seconds({{1970,1,1}, {0,0,0}})).

View File

@ -8,11 +8,6 @@
-behaviour(minirest_api). -behaviour(minirest_api).
-export([ sampling/1
, sampling/2
, get_collect/1
]).
-export([api_spec/0]). -export([api_spec/0]).
-export([counters/2, current_counters/2]). -export([counters/2, current_counters/2]).
@ -36,8 +31,7 @@ monitor_api() ->
name => node, name => node,
in => query, in => query,
required => false, required => false,
schema => #{type => string}, schema => #{type => string}
example => node()
}, },
#{ #{
name => counter, name => counter,
@ -47,7 +41,7 @@ monitor_api() ->
} }
], ],
responses => #{ responses => #{
<<"200">> => emqx_mgmt_util:response_array_schema(<<"Monitor count data">>, counters)}}}, <<"200">> => emqx_mgmt_util:response_schema(<<"Monitor count data">>, counters)}}},
{"/monitor", Metadata, counters}. {"/monitor", Metadata, counters}.
monitor_current_api() -> monitor_current_api() ->
Metadata = #{ Metadata = #{
@ -62,9 +56,6 @@ current_counters_schema() ->
#{ #{
type => object, type => object,
properties => #{ properties => #{
nodes => #{
type => integer,
description => <<"Nodes count">>},
connection => #{type => integer}, connection => #{type => integer},
sent => #{type => integer}, sent => #{type => integer},
received => #{type => integer}, received => #{type => integer},
@ -72,13 +63,11 @@ current_counters_schema() ->
}. }.
counters_schema() -> counters_schema() ->
Node = #{ Fun =
node => #{ fun(K, M) ->
type => string, maps:merge(M, counters_schema(K))
example => node() end,
} Properties = lists:foldl(Fun, #{}, ?COUNTERS),
},
Properties = lists:foldl(fun(K, M) -> maps:merge(M, counters_schema(K)) end, Node, ?COUNTERS),
#{ #{
counters => #{ counters => #{
type => object, type => object,
@ -100,8 +89,7 @@ counters_schema(Name) ->
counters(get, Request) -> counters(get, Request) ->
case cowboy_req:parse_qs(Request) of case cowboy_req:parse_qs(Request) of
[] -> [] ->
Response = [sampling(Node) || Node <- ekka_mnesia:running_nodes()], {200, get_collect()};
{200, Response};
Params -> Params ->
lookup(Params) lookup(Params)
end. end.
@ -144,6 +132,10 @@ format_current_metrics([], Acc) ->
format_current_metrics([{Received, Sent, Sub, Conn} | Collects], {Received1, Sent1, Sub1, Conn1}) -> format_current_metrics([{Received, Sent, Sub, Conn} | Collects], {Received1, Sent1, Sub1, Conn1}) ->
format_current_metrics(Collects, {Received1 + Received, Sent1 + Sent, Sub1 + Sub, Conn1 + Conn}). format_current_metrics(Collects, {Received1 + Received, Sent1 + Sent, Sub1 + Sub, Conn1 + Conn}).
get_collect() ->
Counters = [sampling(Node) || Node <- ekka_mnesia:running_nodes()],
merger_counters(Counters).
get_collect(Node) when Node =:= node() -> get_collect(Node) when Node =:= node() ->
emqx_dashboard_collection:get_collect(); emqx_dashboard_collection:get_collect();
get_collect(Node) -> get_collect(Node) ->
@ -152,17 +144,61 @@ get_collect(Node) ->
Res -> Res Res -> Res
end. end.
merger_counters(ClusterCounters) ->
lists:foldl(fun merger_node_counters/2, #{}, ClusterCounters).
merger_node_counters(NodeCounters, Counters) ->
maps:fold(fun merger_counter/3, Counters, NodeCounters).
merger_counter(Key, Counters, Res) ->
case maps:get(Key, Res, undefined) of
undefined ->
Res#{Key => Counters};
OldCounters ->
NCounters = lists:foldl(fun merger_counter/2, OldCounters, Counters),
Res#{Key => NCounters}
end.
merger_counter(#{timestamp := Timestamp, count := Value}, Counters) ->
Comparison =
fun(Counter) ->
case maps:get(timestamp, Counter) =:= Timestamp of
true ->
Count = maps:get(count, Counter),
{ok, Counter#{count => Count + Value}};
false ->
ignore
end
end,
key_replace(Counters, Comparison, #{timestamp => Timestamp, count => Value}).
key_replace(List, Comparison, Default) ->
key_replace(List, List, Comparison, Default).
key_replace([], All, _Comparison, Default) ->
[Default | All];
key_replace([Term | List], All, Comparison, Default) ->
case Comparison(Term) of
{ok, NTerm} ->
Tail = [NTerm | List],
Header = lists:sublist(All, length(All) - length(Tail)),
lists:append(Header, Tail);
_ ->
key_replace(List, All, Comparison, Default)
end.
sampling(Node) when Node =:= node() -> sampling(Node) when Node =:= node() ->
Time = emqx_dashboard_collection:get_local_time() - 7200000, Time = emqx_dashboard_collection:get_local_time() - 7200000,
All = dets:select(emqx_collect, [{{mqtt_collect,'$1','$2'}, [{'>', '$1', Time}], ['$_']}]), All = dets:select(emqx_collect, [{{mqtt_collect,'$1','$2'}, [{'>', '$1', Time}], ['$_']}]),
maps:put(node, Node, format(lists:sort(All))); format(lists:sort(All));
sampling(Node) -> sampling(Node) ->
rpc:call(Node, ?MODULE, sampling, [Node]). rpc:call(Node, ?MODULE, sampling, [Node]).
sampling(Node, Counter) when Node =:= node() -> sampling(Node, Counter) when Node =:= node() ->
Time = emqx_dashboard_collection:get_local_time() - 7200000, Time = emqx_dashboard_collection:get_local_time() - 7200000,
All = dets:select(emqx_collect, [{{mqtt_collect,'$1','$2'}, [{'>', '$1', Time}], ['$_']}]), All = dets:select(emqx_collect, [{{mqtt_collect,'$1','$2'}, [{'>', '$1', Time}], ['$_']}]),
maps:put(node, Node, format_single(lists:sort(All), Counter)); format_single(lists:sort(All), Counter);
sampling(Node, Counter) -> sampling(Node, Counter) ->
rpc:call(Node, ?MODULE, sampling, [Node, Counter]). rpc:call(Node, ?MODULE, sampling, [Node, Counter]).

View File

@ -96,7 +96,7 @@ t_rest_api(_Config) ->
, http_post("users", #{<<"username">> => <<"usera">>, <<"password">> => <<"passwd">>}) , http_post("users", #{<<"username">> => <<"usera">>, <<"password">> => <<"passwd">>})
, http_post("auth", #{<<"username">> => <<"usera">>, <<"password">> => <<"passwd">>}) , http_post("auth", #{<<"username">> => <<"usera">>, <<"password">> => <<"passwd">>})
, http_delete("users/usera") , http_delete("users/usera")
, http_put("change_pwd/admin", #{<<"old_pwd">> => <<"public">>, <<"new_pwd">> => <<"newpwd">>}) , http_put("users/admin/change_pwd", #{<<"old_pwd">> => <<"public">>, <<"new_pwd">> => <<"newpwd">>})
, http_post("auth", #{<<"username">> => <<"admin">>, <<"password">> => <<"newpwd">>}) , http_post("auth", #{<<"username">> => <<"admin">>, <<"password">> => <<"newpwd">>})
]], ]],
ok. ok.

View File

@ -426,16 +426,16 @@ log {
## Limits the total number of characters printed for each log event. ## Limits the total number of characters printed for each log event.
## ##
## @doc log.chars_limit ## @doc log.chars_limit
## ValueType: Integer | infinity ## ValueType: unlimited | Integer
## Range: [0, infinity) ## Range: [0, +Inf)
## Default: infinity ## Default: unlimited
chars_limit: infinity chars_limit: unlimited
## Maximum depth for Erlang term log formatting ## Maximum depth for Erlang term log formatting
## and Erlang process message queue inspection. ## and Erlang process message queue inspection.
## ##
## @doc log.max_depth ## @doc log.max_depth
## ValueType: Integer | infinity ## ValueType: unlimited | Integer
## Default: 80 ## Default: 80
max_depth: 80 max_depth: 80

View File

@ -138,7 +138,32 @@ start_one_app(App) ->
%% 1. due to static static config change %% 1. due to static static config change
%% 2. after join a cluster %% 2. after join a cluster
reboot_apps() -> reboot_apps() ->
[gproc, esockd, ranch, cowboy, ekka, emqx]. %% | ?EMQX_DEP_APPS]. [ gproc
, esockd
, ranch
, cowboy
, ekka
, emqx
, emqx_prometheus
, emqx_modules
, emqx_dashboard
, emqx_connector
, emqx_gateway
, emqx_statsd
, emqx_resource
, emqx_rule_engine
, emqx_data_bridge
, emqx_bridge_mqtt
, emqx_plugin_libs
, emqx_config_helper
, emqx_management
, emqx_release_helper
, emqx_retainer
, emqx_exhook
, emqx_rule_actions
, emqx_authn
, emqx_authz
].
sorted_reboot_apps() -> sorted_reboot_apps() ->
Apps = [{App, app_deps(App)} || App <- reboot_apps()], Apps = [{App, app_deps(App)} || App <- reboot_apps()],

View File

@ -161,9 +161,11 @@ fields("log") ->
, {"console_handler", ref("console_handler")} , {"console_handler", ref("console_handler")}
, {"file_handlers", ref("file_handlers")} , {"file_handlers", ref("file_handlers")}
, {"time_offset", t(string(), undefined, "system")} , {"time_offset", t(string(), undefined, "system")}
, {"chars_limit", maybe_infinity(range(1, inf))} , {"chars_limit", #{type => hoconsc:union([unlimited, range(1, inf)]),
default => unlimited
}}
, {"supervisor_reports", t(union([error, progress]), undefined, error)} , {"supervisor_reports", t(union([error, progress]), undefined, error)}
, {"max_depth", t(union([infinity, integer()]), , {"max_depth", t(union([unlimited, integer()]),
"kernel.error_logger_format_depth", 80)} "kernel.error_logger_format_depth", 80)}
, {"formatter", t(union([text, json]), undefined, text)} , {"formatter", t(union([text, json]), undefined, text)}
, {"single_line", t(boolean(), undefined, true)} , {"single_line", t(boolean(), undefined, true)}
@ -188,7 +190,8 @@ fields("log_file_handler") ->
[ {"level", t(log_level(), undefined, warning)} [ {"level", t(log_level(), undefined, warning)}
, {"file", t(file(), undefined, undefined)} , {"file", t(file(), undefined, undefined)}
, {"rotation", ref("log_rotation")} , {"rotation", ref("log_rotation")}
, {"max_size", maybe_infinity(emqx_schema:bytesize(), "10MB")} , {"max_size", #{type => union([infinity, emqx_schema:bytesize()]),
default => "10MB"}}
]; ];
fields("log_rotation") -> fields("log_rotation") ->
@ -258,8 +261,8 @@ tr_logger_level(Conf) -> conf_get("log.primary_level", Conf).
tr_logger(Conf) -> tr_logger(Conf) ->
CharsLimit = case conf_get("log.chars_limit", Conf) of CharsLimit = case conf_get("log.chars_limit", Conf) of
infinity -> unlimited; unlimited -> unlimited;
V -> V V when V > 0 -> V
end, end,
SingleLine = conf_get("log.single_line", Conf), SingleLine = conf_get("log.single_line", Conf),
FmtName = conf_get("log.formatter", Conf), FmtName = conf_get("log.formatter", Conf),
@ -378,15 +381,6 @@ t(Type, Mapping, Default, OverrideEnv) ->
ref(Field) -> hoconsc:t(hoconsc:ref(Field)). ref(Field) -> hoconsc:t(hoconsc:ref(Field)).
maybe_infinity(T) ->
maybe_sth(infinity, T, infinity).
maybe_infinity(T, Default) ->
maybe_sth(infinity, T, Default).
maybe_sth(What, Type, Default) ->
t(union([What, Type]), undefined, Default).
options(static, Conf) -> options(static, Conf) ->
[{seeds, [to_atom(S) || S <- conf_get("cluster.static.seeds", Conf, [])]}]; [{seeds, [to_atom(S) || S <- conf_get("cluster.static.seeds", Conf, [])]}];
options(mcast, Conf) -> options(mcast, Conf) ->

View File

@ -283,7 +283,7 @@ list_client_subscriptions(ClientId) ->
end. end.
client_subscriptions(Node, ClientId) when Node =:= node() -> client_subscriptions(Node, ClientId) when Node =:= node() ->
emqx_broker:subscriptions(ClientId); {Node, emqx_broker:subscriptions(ClientId)};
client_subscriptions(Node, ClientId) -> client_subscriptions(Node, ClientId) ->
rpc_call(Node, client_subscriptions, [Node, ClientId]). rpc_call(Node, client_subscriptions, [Node, ClientId]).

View File

@ -220,16 +220,32 @@ clients_api() ->
get => #{ get => #{
description => <<"List clients">>, description => <<"List clients">>,
parameters => [ parameters => [
#{
name => page,
in => query,
required => false,
description => <<"Page">>,
schema => #{type => integer}
},
#{
name => limit,
in => query,
required => false,
description => <<"Page limit">>,
schema => #{type => integer}
},
#{ #{
name => node, name => node,
in => query, in => query,
required => false, required => false,
description => <<"Node name">>,
schema => #{type => string} schema => #{type => string}
}, },
#{ #{
name => username, name => username,
in => query, in => query,
required => false, required => false,
description => <<"User name">>,
schema => #{type => string} schema => #{type => string}
}, },
#{ #{
@ -242,66 +258,77 @@ clients_api() ->
name => ip_address, name => ip_address,
in => query, in => query,
required => false, required => false,
description => <<"IP address">>,
schema => #{type => string} schema => #{type => string}
}, },
#{ #{
name => conn_state, name => conn_state,
in => query, in => query,
required => false, required => false,
schema => #{type => string} description => <<"The current connection status of the client, the possible values are connected,idle,disconnected">>,
schema => #{type => string, enum => [connected, idle, disconnected]}
}, },
#{ #{
name => clean_start, name => clean_start,
in => query, in => query,
required => false, required => false,
schema => #{type => string} description => <<"Whether the client uses a new session">>,
schema => #{type => boolean}
}, },
#{ #{
name => proto_name, name => proto_name,
in => query, in => query,
required => false, required => false,
schema => #{type => string} description => <<"Client protocol name, the possible values are MQTT,CoAP,LwM2M,MQTT-SN">>,
schema => #{type => string, enum => ['MQTT', 'CoAP', 'LwM2M', 'MQTT-SN']}
}, },
#{ #{
name => proto_ver, name => proto_ver,
in => query, in => query,
required => false, required => false,
description => <<"Client protocol version">>,
schema => #{type => string} schema => #{type => string}
}, },
#{ #{
name => like_clientid, name => like_clientid,
in => query, in => query,
required => false, required => false,
description => <<"Fuzzy search of client identifier by substring method">>,
schema => #{type => string} schema => #{type => string}
}, },
#{ #{
name => like_username, name => like_username,
in => query, in => query,
required => false, required => false,
description => <<"Client user name, fuzzy search by substring">>,
schema => #{type => string} schema => #{type => string}
}, },
#{ #{
name => gte_created_at, name => gte_created_at,
in => query, in => query,
required => false, required => false,
description => <<"Search client session creation time by less than or equal method">>,
schema => #{type => string} schema => #{type => string}
}, },
#{ #{
name => lte_created_at, name => lte_created_at,
in => query, in => query,
required => false, required => false,
description => <<"Search client session creation time by greater than or equal method">>,
schema => #{type => string} schema => #{type => string}
}, },
#{ #{
name => gte_connected_at, name => gte_connected_at,
in => query, in => query,
required => false, required => false,
description => <<"Search client connection creation time by less than or equal method">>,
schema => #{type => string} schema => #{type => string}
}, },
#{ #{
name => lte_connected_at, name => lte_connected_at,
in => query, in => query,
required => false, required => false,
description => <<"Search client connection creation time by greater than or equal method">>,
schema => #{type => string} schema => #{type => string}
} }
], ],
@ -362,15 +389,6 @@ clients_authz_cache_api() ->
{"/clients/:clientid/authz_cache", Metadata, authz_cache}. {"/clients/:clientid/authz_cache", Metadata, authz_cache}.
clients_subscriptions_api() -> clients_subscriptions_api() ->
SubscriptionSchema = #{
type => object,
properties => #{
topic => #{
type => string},
qos => #{
type => integer,
enum => [0,1,2]}}
},
Metadata = #{ Metadata = #{
get => #{ get => #{
description => <<"Get client subscriptions">>, description => <<"Get client subscriptions">>,
@ -382,7 +400,7 @@ clients_subscriptions_api() ->
}], }],
responses => #{ responses => #{
<<"200">> => <<"200">> =>
emqx_mgmt_util:response_array_schema(<<"Get client subscriptions">>, SubscriptionSchema)}} emqx_mgmt_util:response_array_schema(<<"Get client subscriptions">>, subscription)}}
}, },
{"/clients/:clientid/subscriptions", Metadata, subscriptions}. {"/clients/:clientid/subscriptions", Metadata, subscriptions}.
@ -486,9 +504,9 @@ subscribe_batch(post, Request) ->
subscriptions(get, Request) -> subscriptions(get, Request) ->
ClientID = cowboy_req:binding(clientid, Request), ClientID = cowboy_req:binding(clientid, Request),
Subs0 = emqx_mgmt:list_client_subscriptions(ClientID), {Node, Subs0} = emqx_mgmt:list_client_subscriptions(ClientID),
Subs = lists:map(fun({Topic, SubOpts}) -> Subs = lists:map(fun({Topic, SubOpts}) ->
#{topic => Topic, qos => maps:get(qos, SubOpts)} #{node => Node, clientid => ClientID, topic => Topic, qos => maps:get(qos, SubOpts)}
end, Subs0), end, Subs0),
{200, Subs}. {200, Subs}.

View File

@ -0,0 +1,189 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_api_configs).
-behaviour(minirest_api).
-export([api_spec/0]).
-export([ config/2
, config_reset/2
]).
-define(PARAM_CONF_PATH, [#{
name => conf_path,
in => query,
description => <<"The config path separated by '.' character">>,
required => false,
schema => #{type => string, default => <<".">>}
}]).
-define(TEXT_BODY(DESCR, SCHEMA), #{
description => list_to_binary(DESCR),
content => #{
<<"text/plain">> => #{
schema => SCHEMA
}
}
}).
-define(PREFIX, "/configs").
-define(PREFIX_RESET, "/configs_reset").
-define(MAX_DEPTH, 1).
-define(ERR_MSG(MSG), io_lib:format("~p", [MSG])).
api_spec() ->
{config_apis() ++ [config_reset_api()], []}.
config_apis() ->
[config_api(ConfPath, Schema) || {ConfPath, Schema} <-
get_conf_schema(emqx_config:get([]), ?MAX_DEPTH)].
config_api(ConfPath, Schema) ->
Path = path_join(ConfPath),
Descr = fun(Str) ->
list_to_binary([Str, " ", path_join(ConfPath, ".")])
end,
Metadata = #{
get => #{
description => Descr("Get configs for"),
responses => #{
<<"200">> => ?TEXT_BODY("Get configs successfully", Schema),
<<"404">> => emqx_mgmt_util:response_error_schema(
<<"Config not found">>, ['NOT_FOUND'])
}
},
put => #{
description => Descr("Update configs for"),
'requestBody' => ?TEXT_BODY("The format of the request body is depend on the 'conf_path' parameter in the query string", Schema),
responses => #{
<<"200">> => ?TEXT_BODY("Update configs successfully", Schema),
<<"400">> => emqx_mgmt_util:response_error_schema(
<<"Update configs failed">>, ['UPDATE_FAILED'])
}
}
},
{?PREFIX ++ "/" ++ Path, Metadata, config}.
config_reset_api() ->
Metadata = #{
post => #{
description => <<"Reset the config entry specified by the query string parameter `conf_path`.<br/>
- For a config entry that has default value, this resets it to the default value;
- For a config entry that has no default value, an error 400 will be returned">>,
parameters => ?PARAM_CONF_PATH,
responses => #{
<<"200">> => emqx_mgmt_util:response_schema(<<"Remove configs successfully">>),
<<"400">> => emqx_mgmt_util:response_error_schema(
<<"It's not able to reset the config">>, ['INVALID_OPERATION'])
}
}
},
{?PREFIX_RESET, Metadata, config_reset}.
%%%==============================================================================================
%% parameters trans
config(get, Req) ->
case emqx_config:find_raw(conf_path(Req)) of
{ok, Conf} ->
{200, Conf};
{not_found, _, _} ->
{404, #{code => 'NOT_FOUND', message => <<"Config cannot found">>}}
end;
config(put, Req) ->
Path = conf_path(Req),
ok = emqx_config:update(Path, http_body(Req)),
{200, emqx_config:get_raw(Path)}.
config_reset(post, Req) ->
%% reset the config specified by the query string param 'conf_path'
Path = conf_path_reset(Req),
case emqx_config:remove(Path ++ conf_path_from_querystr(Req)) of
ok -> {200, emqx_config:get_raw(Path)};
{error, Reason} ->
{400, ?ERR_MSG(Reason)}
end.
conf_path_from_querystr(Req) ->
case proplists:get_value(<<"conf_path">>, cowboy_req:parse_qs(Req)) of
undefined -> [];
Path -> string:lexemes(Path, ". ")
end.
conf_path(Req) ->
<<"/api/v5", ?PREFIX, Path/binary>> = cowboy_req:path(Req),
string:lexemes(Path, "/ ").
conf_path_reset(Req) ->
<<"/api/v5", ?PREFIX_RESET, Path/binary>> = cowboy_req:path(Req),
string:lexemes(Path, "/ ").
http_body(Req) ->
{ok, Body, _} = cowboy_req:read_body(Req),
try jsx:decode(Body, [{return_maps, true}])
catch error:badarg -> Body
end.
get_conf_schema(Conf, MaxDepth) ->
get_conf_schema([], maps:to_list(Conf), [], MaxDepth).
get_conf_schema(_BasePath, [], Result, _MaxDepth) ->
Result;
get_conf_schema(BasePath, [{Key, Conf} | Confs], Result, MaxDepth) ->
Path = BasePath ++ [Key],
Depth = length(Path),
Result1 = case is_map(Conf) of
true when Depth < MaxDepth ->
get_conf_schema(Path, maps:to_list(Conf), Result, MaxDepth);
true when Depth >= MaxDepth -> Result;
false -> Result
end,
get_conf_schema(BasePath, Confs, [{Path, gen_schema(Conf)} | Result1], MaxDepth).
%% TODO: generate from hocon schema
gen_schema(Conf) when is_boolean(Conf) ->
#{type => boolean};
gen_schema(Conf) when is_binary(Conf); is_atom(Conf) ->
#{type => string};
gen_schema(Conf) when is_number(Conf) ->
#{type => number};
gen_schema(Conf) when is_list(Conf) ->
#{type => array, items => case Conf of
[] -> #{}; %% don't know the type
_ -> gen_schema(hd(Conf))
end};
gen_schema(Conf) when is_map(Conf) ->
#{type => object, properties =>
maps:map(fun(_K, V) -> gen_schema(V) end, Conf)};
gen_schema(_Conf) ->
%% the conf is not of JSON supported type, it may have been converted
%% by the hocon schema
#{type => string}.
path_join(Path) ->
path_join(Path, "/").
path_join([P], _Sp) -> str(P);
path_join([P | Path], Sp) ->
str(P) ++ Sp ++ path_join(Path, Sp).
str(S) when is_list(S) -> S;
str(S) when is_binary(S) -> binary_to_list(S);
str(S) when is_atom(S) -> atom_to_list(S).

View File

@ -67,11 +67,17 @@ subscriptions_api() ->
description => <<"Client ID">>, description => <<"Client ID">>,
schema => #{type => string} schema => #{type => string}
}, },
#{
name => node,
in => query,
description => <<"Node name">>,
schema => #{type => string}
},
#{ #{
name => qos, name => qos,
in => query, in => query,
description => <<"QoS">>, description => <<"QoS">>,
schema => #{type => integer} schema => #{type => integer, enum => [0, 1, 2]}
}, },
#{ #{
name => share, name => share,
@ -101,6 +107,8 @@ subscription_schema() ->
subscription => #{ subscription => #{
type => object, type => object,
properties => #{ properties => #{
node => #{
type => string},
topic => #{ topic => #{
type => string}, type => string},
clientid => #{ clientid => #{
@ -115,8 +123,12 @@ subscriptions(get, Request) ->
list(Params). list(Params).
list(Params) -> list(Params) ->
{200, emqx_mgmt_api:cluster_query(Params, ?SUBS_QS_SCHEMA, ?query_fun)}. case proplists:get_value(<<"node">>, Params, undefined) of
undefined ->
{200, emqx_mgmt_api:cluster_query(Params, ?SUBS_QS_SCHEMA, ?query_fun)};
Node ->
{200, emqx_mgmt_api:node_query(binary_to_atom(Node, utf8), Params, ?SUBS_QS_SCHEMA, ?query_fun)}
end.
format(Items) when is_list(Items) -> format(Items) when is_list(Items) ->
[format(Item) || Item <- Items]; [format(Item) || Item <- Items];
@ -126,10 +138,20 @@ format({{Subscriber, Topic}, Options}) ->
format({_Subscriber, Topic, Options = #{share := Group}}) -> format({_Subscriber, Topic, Options = #{share := Group}}) ->
QoS = maps:get(qos, Options), QoS = maps:get(qos, Options),
#{topic => filename:join([<<"$share">>, Group, Topic]), clientid => maps:get(subid, Options), qos => QoS}; #{
topic => filename:join([<<"$share">>, Group, Topic]),
clientid => maps:get(subid, Options),
qos => QoS,
node => node()
};
format({_Subscriber, Topic, Options}) -> format({_Subscriber, Topic, Options}) ->
QoS = maps:get(qos, Options), QoS = maps:get(qos, Options),
#{topic => Topic, clientid => maps:get(subid, Options), qos => QoS}. #{
topic => Topic,
clientid => maps:get(subid, Options),
qos => QoS,
node => node()
}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Query Function %% Query Function

View File

@ -48,6 +48,14 @@ start_listener({Proto, Port, Options}) ->
openapi => "3.0.0", openapi => "3.0.0",
info => #{title => "EMQ X API", version => "5.0.0"}, info => #{title => "EMQ X API", version => "5.0.0"},
servers => [#{url => ?BASE_PATH}], servers => [#{url => ?BASE_PATH}],
tags => [#{
name => configs,
description => <<"The query string parameter `conf_path` is of jq format.">>,
externalDocs => #{
description => "Find out more about the path syntax in jq",
url => "https://stedolan.github.io/jq/manual/"
}
}],
components => #{ components => #{
schemas => #{}, schemas => #{},
securitySchemes => #{ securitySchemes => #{

View File

@ -61,13 +61,12 @@ emqx_retainer: {
## Storage connect parameters ## Storage connect parameters
## ##
## Value: mnesia ## Value: built_in_database
## ##
connector:
[
{
type: mnesia
config: { config: {
type: built_in_database
## storage_type: ram | disc | disc_only ## storage_type: ram | disc | disc_only
storage_type: ram storage_type: ram
@ -76,6 +75,4 @@ emqx_retainer: {
## Value: Number >= 0 ## Value: Number >= 0
max_retained_messages: 0 max_retained_messages: 0
} }
}
]
} }

View File

@ -19,6 +19,6 @@
[{test, [{test,
[{deps, [{deps,
[ [
{emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.3"}}}]} {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.4.0"}}}]}
]} ]}
]}. ]}.

View File

@ -338,16 +338,16 @@ get_msg_deliver_quota() ->
update_config(#{clear_timer := ClearTimer, update_config(#{clear_timer := ClearTimer,
release_quota_timer := QuotaTimer} = State, Conf) -> release_quota_timer := QuotaTimer} = State, Conf) ->
#{enable := Enable, #{enable := Enable,
connector := [Connector | _], config := Config,
flow_control := #{quota_release_interval := QuotaInterval}, flow_control := #{quota_release_interval := QuotaInterval},
msg_clear_interval := ClearInterval} = Conf, msg_clear_interval := ClearInterval} = Conf,
#{connector := [OldConnector | _]} = emqx_config:get([?APP]), #{config := OldConfig} = emqx_config:get([?APP]),
case Enable of case Enable of
true -> true ->
StorageType = maps:get(type, Connector), StorageType = maps:get(type, Config),
OldStrorageType = maps:get(type, OldConnector), OldStrorageType = maps:get(type, OldConfig),
case OldStrorageType of case OldStrorageType of
StorageType -> StorageType ->
State#{clear_timer := check_timer(ClearTimer, State#{clear_timer := check_timer(ClearTimer,
@ -368,9 +368,9 @@ update_config(#{clear_timer := ClearTimer,
enable_retainer(#{context_id := ContextId} = State, enable_retainer(#{context_id := ContextId} = State,
#{msg_clear_interval := ClearInterval, #{msg_clear_interval := ClearInterval,
flow_control := #{quota_release_interval := ReleaseInterval}, flow_control := #{quota_release_interval := ReleaseInterval},
connector := [Connector | _]}) -> config := Config}) ->
NewContextId = ContextId + 1, NewContextId = ContextId + 1,
Context = create_resource(new_context(NewContextId), Connector), Context = create_resource(new_context(NewContextId), Config),
load(Context), load(Context),
State#{enable := true, State#{enable := true,
context_id := NewContextId, context_id := NewContextId,
@ -416,14 +416,19 @@ check_timer(Timer, _, _) ->
-spec get_backend_module() -> backend(). -spec get_backend_module() -> backend().
get_backend_module() -> get_backend_module() ->
[#{type := Backend} | _] = emqx_config:get([?APP, connector]), #{type := Backend} = emqx_config:get([?APP, config]),
erlang:list_to_existing_atom(io_lib:format("~s_~s", [?APP, Backend])). ModName = if Backend =:= built_in_database ->
mnesia;
true ->
Backend
end,
erlang:list_to_existing_atom(io_lib:format("~s_~s", [?APP, ModName])).
create_resource(Context, #{type := mnesia, config := Cfg}) -> create_resource(Context, #{type := built_in_database} = Cfg) ->
emqx_retainer_mnesia:create_resource(Cfg), emqx_retainer_mnesia:create_resource(Cfg),
Context; Context;
create_resource(Context, #{type := DB, config := Config}) -> create_resource(Context, #{type := DB} = Config) ->
ResourceID = erlang:iolist_to_binary([io_lib:format("~s_~s", [?APP, DB])]), ResourceID = erlang:iolist_to_binary([io_lib:format("~s_~s", [?APP, DB])]),
case emqx_resource:create( case emqx_resource:create(
ResourceID, ResourceID,

View File

@ -33,8 +33,6 @@
-export([create_resource/1]). -export([create_resource/1]).
-define(DEF_MAX_RETAINED_MESSAGES, 0).
-rlog_shard({?RETAINER_SHARD, ?TAB}). -rlog_shard({?RETAINER_SHARD, ?TAB}).
-record(retained, {topic, msg, expiry_time}). -record(retained, {topic, msg, expiry_time}).
@ -229,10 +227,7 @@ make_match_spec(Filter) ->
-spec is_table_full() -> boolean(). -spec is_table_full() -> boolean().
is_table_full() -> is_table_full() ->
[#{config := Cfg} | _] = emqx_config:get([?APP, connector]), #{max_retained_messages := Limit} = emqx_config:get([?APP, config]),
Limit = maps:get(max_retained_messages,
Cfg,
?DEF_MAX_RETAINED_MESSAGES),
Limit > 0 andalso (table_size() >= Limit). Limit > 0 andalso (table_size() >= Limit).
-spec table_size() -> non_neg_integer(). -spec table_size() -> non_neg_integer().

View File

@ -12,18 +12,14 @@ fields("emqx_retainer") ->
[ {enable, t(boolean(), false)} [ {enable, t(boolean(), false)}
, {msg_expiry_interval, t(emqx_schema:duration_ms(), "0s")} , {msg_expiry_interval, t(emqx_schema:duration_ms(), "0s")}
, {msg_clear_interval, t(emqx_schema:duration_ms(), "0s")} , {msg_clear_interval, t(emqx_schema:duration_ms(), "0s")}
, {connector, connector()}
, {flow_control, ?TYPE(hoconsc:ref(?MODULE, flow_control))} , {flow_control, ?TYPE(hoconsc:ref(?MODULE, flow_control))}
, {max_payload_size, t(emqx_schema:bytesize(), "1MB")} , {max_payload_size, t(emqx_schema:bytesize(), "1MB")}
, {config, config()}
]; ];
fields(mnesia_connector) -> fields(mnesia_config) ->
[ {type, ?TYPE(hoconsc:union([mnesia]))} [ {type, ?TYPE(hoconsc:union([built_in_database]))}
, {config, ?TYPE(hoconsc:ref(?MODULE, mnesia_connector_cfg))} , {storage_type, t(hoconsc:union([ram, disc, disc_only]), ram)}
];
fields(mnesia_connector_cfg) ->
[ {storage_type, t(hoconsc:union([ram, disc, disc_only]), ram)}
, {max_retained_messages, t(integer(), 0, fun is_pos_integer/1)} , {max_retained_messages, t(integer(), 0, fun is_pos_integer/1)}
]; ];
@ -43,11 +39,8 @@ t(Type, Default, Validator) ->
hoconsc:t(Type, #{default => Default, hoconsc:t(Type, #{default => Default,
validator => Validator}). validator => Validator}).
union_array(Item) when is_list(Item) ->
hoconsc:array(hoconsc:union(Item)).
is_pos_integer(V) -> is_pos_integer(V) ->
V >= 0. V >= 0.
connector() -> config() ->
#{type => union_array([hoconsc:ref(?MODULE, mnesia_connector)])}. #{type => hoconsc:union([hoconsc:ref(?MODULE, mnesia_config)])}.

View File

@ -67,10 +67,9 @@ new_emqx_retainer_conf() ->
#{enable => true, #{enable => true,
msg_expiry_interval => 0, msg_expiry_interval => 0,
msg_clear_interval => 0, msg_clear_interval => 0,
connector => [#{type => mnesia, config => #{type => built_in_database,
config => max_retained_messages => 0,
#{max_retained_messages => 0, storage_type => ram},
storage_type => ram}}],
flow_control => #{max_read_number => 0, flow_control => #{max_read_number => 0,
msg_deliver_quota => 0, msg_deliver_quota => 0,
quota_release_interval => 0}, quota_release_interval => 0},

View File

@ -302,9 +302,19 @@ bootstrapd() {
# check if a PID is down # check if a PID is down
is_down() { is_down() {
PID="$1" PID="$1"
if kill -s 0 "$PID" 2>/dev/null; then if ps -p "$PID" >/dev/null; then
# still around
# shellcheck disable=SC2009 # this grep pattern is not a part of the progra names
if ps -p "$PID" | grep -q 'defunct'; then
# zombie state, print parent pid
parent="$(ps -o ppid= -p "$PID" | tr -d ' ')"
echo "WARN: $PID is marked <defunct>, parent:"
ps -p "$parent"
return 0
fi
return 1 return 1
fi fi
# it's gone
return 0 return 0
} }
@ -484,12 +494,12 @@ case "$1" in
exit 1 exit 1
fi fi
WAIT_TIME="${WAIT_FOR_ERLANG_STOP:-60}" WAIT_TIME="${WAIT_FOR_ERLANG_STOP:-60}"
if ! wait_for "$WAIT_TIME" is_down "$PID"; then if ! wait_for "$WAIT_TIME" 'is_down' "$PID"; then
msg="dangling after ${WAIT_TIME} seconds" msg="dangling after ${WAIT_TIME} seconds"
# also log to syslog # also log to syslog
logger -t "${REL_NAME}[${PID}]" "STOP: $msg" logger -t "${REL_NAME}[${PID}]" "STOP: $msg"
# log to user console # log to user console
echoerr "STOP: $msg" echoerr "stop failed, $msg"
exit 1 exit 1
fi fi
logger -t "${REL_NAME}[${PID}]" "STOP: OK" logger -t "${REL_NAME}[${PID}]" "STOP: OK"

View File

@ -10,7 +10,8 @@
{edoc_opts, [{preprocess,true}]}. {edoc_opts, [{preprocess,true}]}.
{erl_opts, [warn_unused_vars,warn_shadow_vars,warn_unused_import, {erl_opts, [warn_unused_vars,warn_shadow_vars,warn_unused_import,
warn_obsolete_guard,compressed, nowarn_unused_import, warn_obsolete_guard,compressed, nowarn_unused_import,
{d, snk_kind, msg}]}. {d, snk_kind, msg}
]}.
{xref_checks,[undefined_function_calls,undefined_functions,locals_not_used, {xref_checks,[undefined_function_calls,undefined_functions,locals_not_used,
deprecated_function_calls,warnings_as_errors,deprecated_functions]}. deprecated_function_calls,warnings_as_errors,deprecated_functions]}.
@ -59,7 +60,7 @@
, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}} , {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}
, {observer_cli, "1.6.1"} % NOTE: depends on recon 2.5.1 , {observer_cli, "1.6.1"} % NOTE: depends on recon 2.5.1
, {getopt, "1.0.2"} , {getopt, "1.0.2"}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.13.0"}}} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.14.1"}}}
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.11.0"}}} , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.11.0"}}}
, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.4.0"}}} , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.4.0"}}}
, {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.1.0"}}} , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.1.0"}}}

View File

@ -52,13 +52,17 @@ overrides() ->
[ {add, [ {extra_src_dirs, [{"etc", [{recursive,true}]}]} [ {add, [ {extra_src_dirs, [{"etc", [{recursive,true}]}]}
, {erl_opts, [{compile_info, [{emqx_vsn, get_vsn()}]}]} , {erl_opts, [{compile_info, [{emqx_vsn, get_vsn()}]}]}
]} ]}
, {add, snabbkaffe, ] ++ snabbkaffe_overrides() ++ community_plugin_overrides().
[{erl_opts, common_compile_opts()}]}
] ++ community_plugin_overrides().
community_plugin_overrides() -> community_plugin_overrides() ->
[{add, App, [ {erl_opts, [{i, "include"}]}]} || App <- relx_plugin_apps_extra()]. [{add, App, [ {erl_opts, [{i, "include"}]}]} || App <- relx_plugin_apps_extra()].
%% Temporary workaround for a rebar3 erl_opts duplication
%% bug. Ideally, we want to set this define globally
snabbkaffe_overrides() ->
Apps = [snabbkaffe, ekka],
[{add, App, [{erl_opts, [{d, snk_kind, msg}]}]} || App <- Apps].
config(HasElixir) -> config(HasElixir) ->
[ {cover_enabled, is_cover_enabled()} [ {cover_enabled, is_cover_enabled()}
, {profiles, profiles()} , {profiles, profiles()}
@ -129,12 +133,9 @@ test_deps() ->
]. ].
common_compile_opts() -> common_compile_opts() ->
AppNames = app_names(),
[ debug_info % alwyas include debug_info [ debug_info % alwyas include debug_info
, {compile_info, [{emqx_vsn, get_vsn()}]} , {compile_info, [{emqx_vsn, get_vsn()}]}
, {d, snk_kind, msg}
] ++ ] ++
[{d, 'EMQX_DEP_APPS', AppNames -- [emqx, emqx_machine]}] ++
[{d, 'EMQX_ENTERPRISE'} || is_enterprise()] ++ [{d, 'EMQX_ENTERPRISE'} || is_enterprise()] ++
[{d, 'EMQX_BENCHMARK'} || os:getenv("EMQX_BENCHMARK") =:= "1" ]. [{d, 'EMQX_BENCHMARK'} || os:getenv("EMQX_BENCHMARK") =:= "1" ].