diff --git a/.ci/build_packages/tests.sh b/.ci/build_packages/tests.sh index 3b08f838a..b01d6ca27 100755 --- a/.ci/build_packages/tests.sh +++ b/.ci/build_packages/tests.sh @@ -1,11 +1,10 @@ #!/bin/bash set -x -e -u +export DEBUG=1 export CODE_PATH=${CODE_PATH:-"/emqx"} export EMQX_NAME=${EMQX_NAME:-"emqx"} export PACKAGE_PATH="${CODE_PATH}/_packages/${EMQX_NAME}" export RELUP_PACKAGE_PATH="${CODE_PATH}/_upgrade_base" -# export EMQX_NODE_NAME="emqx-on-$(uname -m)@127.0.0.1" -# export EMQX_NODE_COOKIE=$(date +%s%N) case "$(uname -m)" in x86_64) @@ -122,6 +121,9 @@ run_test(){ tee -a "$emqx_env_vars" < /usr/local/etc/haproxy/certs/emqx.pem + set -x + cat /usr/local/etc/haproxy/certs/cert.pem /usr/local/etc/haproxy/certs/key.pem > /tmp/emqx.pem haproxy -f /usr/local/etc/haproxy/haproxy.cfg emqx1: diff --git a/.ci/docker-compose-file/docker-compose.yaml b/.ci/docker-compose-file/docker-compose.yaml index a502aeb6e..8a5fec042 100644 --- a/.ci/docker-compose-file/docker-compose.yaml +++ b/.ci/docker-compose-file/docker-compose.yaml @@ -1,8 +1,8 @@ version: '3.9' services: - erlang: - container_name: erlang + erlang23: + container_name: erlang23 image: emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04 env_file: - conf.env @@ -21,6 +21,26 @@ services: working_dir: /emqx tty: true + erlang24: + container_name: erlang24 + image: emqx/build-env:erl24.0.5-emqx-1-ubuntu20.04 + env_file: + - conf.env + environment: + GITHUB_ACTIONS: ${GITHUB_ACTIONS} + GITHUB_TOKEN: ${GITHUB_TOKEN} + GITHUB_RUN_ID: ${GITHUB_RUN_ID} + GITHUB_SHA: ${GITHUB_SHA} + GITHUB_RUN_NUMBER: ${GITHUB_RUN_NUMBER} + GITHUB_EVENT_NAME: ${GITHUB_EVENT_NAME} + GITHUB_REF: ${GITHUB_REF} + networks: + - emqx_bridge + volumes: + - ../..:/emqx + working_dir: /emqx + tty: true + networks: emqx_bridge: driver: bridge diff --git a/.ci/docker-compose-file/haproxy/haproxy.cfg b/.ci/docker-compose-file/haproxy/haproxy.cfg index 73c219d55..4361ccadb 100644 --- a/.ci/docker-compose-file/haproxy/haproxy.cfg +++ b/.ci/docker-compose-file/haproxy/haproxy.cfg @@ -11,6 +11,7 @@ global tune.ssl.default-dh-param 2048 ssl-default-bind-ciphers ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-AES256-GCM-SHA384:DHE-RSA-AES128-GCM-SHA256:DHE-DSS-AES128-GCM-SHA256:kEDH+AESGCM:ECDHE-RSA-AES128-SHA256:ECDHE-ECDSA-AES128-SHA256:ECDHE-RSA-AES128-SHA:ECDHE-ECDSA-AES128-SHA:ECDHE-RSA-AES256-SHA384:ECDHE-ECDSA-AES256-SHA384:ECDHE-RSA-AES256-SHA:ECDHE-ECDSA-AES256-SHA:DHE-RSA-AES128-SHA256:DHE-RSA-AES128-SHA:DHE-DSS-AES128-SHA256:DHE-RSA-AES256-SHA256:DHE-DSS-AES256-SHA:DHE-RSA-AES256-SHA:ECDHE-RSA-DES-CBC3-SHA:ECDHE-ECDSA-DES-CBC3-SHA:EDH-RSA-DES-CBC3-SHA:AES128-GCM-SHA256:AES256-GCM-SHA384:AES128-SHA256:AES256-SHA256:AES128-SHA:AES256-SHA:AES:DES-CBC3-SHA:HIGH:SEED:!aNULL:!eNULL:!EXPORT:!DES:!RC4:!MD5:!PSK:!RSAPSK:!aDH:!aECDH:!EDH-DSS-DES-CBC3-SHA:!KRB5-DES-CBC3-SHA:!SRP # Enable the HAProxy Runtime API + # e.g. echo "show table emqx_tcp_back" | sudo socat stdio tcp4-connect:172.100.239.4:9999 stats socket :9999 level admin expose-fd listeners ##---------------------------------------------------------------- @@ -61,6 +62,8 @@ frontend emqx_tcp mode tcp option tcplog bind *:1883 + # Reject connections that have an invalid MQTT packet + # tcp-request content reject unless { req.payload(0,0), mqtt_is_valid } default_backend emqx_tcp_back frontend emqx_ws @@ -71,7 +74,13 @@ frontend emqx_ws backend emqx_tcp_back mode tcp - balance static-rr + + # Create a stick table for session persistence + stick-table type string len 32 size 100k expire 30m + + # Use ClientID / client_identifier as persistence key + stick on req.payload(0,0),mqtt_field_value(connect,client_identifier) + server emqx-1 node1.emqx.io:1883 check-send-proxy send-proxy-v2 server emqx-2 node2.emqx.io:1883 check-send-proxy send-proxy-v2 @@ -87,19 +96,19 @@ backend emqx_ws_back frontend emqx_ssl mode tcp option tcplog - bind *:8883 ssl crt /usr/local/etc/haproxy/certs/emqx.pem ca-file /usr/local/etc/haproxy/certs/cacert.pem verify required no-sslv3 + bind *:8883 ssl crt /tmp/emqx.pem ca-file /usr/local/etc/haproxy/certs/cacert.pem verify required no-sslv3 default_backend emqx_ssl_back frontend emqx_wss mode tcp option tcplog - bind *:8084 ssl crt /usr/local/etc/haproxy/certs/emqx.pem ca-file /usr/local/etc/haproxy/certs/cacert.pem verify required no-sslv3 + bind *:8084 ssl crt /tmp/emqx.pem ca-file /usr/local/etc/haproxy/certs/cacert.pem verify required no-sslv3 default_backend emqx_wss_back backend emqx_ssl_back mode tcp balance static-rr - server emqx-1 node1.emqx.io:1883 check-send-proxy send-proxy-v2-ssl-cn + server emqx-1 node1.emqx.io:1883 check-send-proxy send-proxy-v2-ssl-cn server emqx-2 node2.emqx.io:1883 check-send-proxy send-proxy-v2-ssl-cn backend emqx_wss_back diff --git a/.github/workflows/build_packages.yaml b/.github/workflows/build_packages.yaml index 0b8316b61..7388b4d05 100644 --- a/.github/workflows/build_packages.yaml +++ b/.github/workflows/build_packages.yaml @@ -10,8 +10,14 @@ on: jobs: prepare: + strategy: + matrix: + container: + - "emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04" + - "emqx/build-env:erl24.0.5-emqx-1-ubuntu20.04" + runs-on: ubuntu-20.04 - container: emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04 + container: ${{ matrix.container }} outputs: profiles: ${{ steps.set_profile.outputs.profiles}} @@ -79,7 +85,7 @@ jobs: - uses: gleam-lang/setup-erlang@v1.1.0 id: install_erlang with: - otp-version: 23.2 + otp-version: 24.0.5 - name: build env: PYTHON: python @@ -135,7 +141,7 @@ jobs: matrix: profile: ${{fromJSON(needs.prepare.outputs.profiles)}} erl_otp: - - 23.2.7.2-emqx-2 + - 24.0.5-emqx-1 exclude: - profile: emqx-edge @@ -213,6 +219,9 @@ jobs: fail-fast: false matrix: profile: ${{fromJSON(needs.prepare.outputs.profiles)}} + erl_otp: + - 23.2.7.2-emqx-2 + - 24.0.5-emqx-1 arch: - amd64 - arm64 @@ -228,8 +237,6 @@ jobs: - centos6 - raspbian10 # - raspbian9 - erl_otp: - - 23.2.7.2-emqx-2 exclude: - os: centos6 arch: arm64 @@ -332,7 +339,7 @@ jobs: matrix: profile: ${{fromJSON(needs.prepare.outputs.profiles)}} erl_otp: - - 23.2.7.2-emqx-2 + - 24.0.5-emqx-1 steps: - uses: actions/download-artifact@v2 diff --git a/.github/workflows/build_slim_packages.yaml b/.github/workflows/build_slim_packages.yaml index 162959040..9e19889fc 100644 --- a/.github/workflows/build_slim_packages.yaml +++ b/.github/workflows/build_slim_packages.yaml @@ -15,7 +15,8 @@ jobs: strategy: matrix: erl_otp: - - erl23.2.7.2-emqx-2 + - erl24.0.5-emqx-1 + os: - ubuntu20.04 - centos7 @@ -43,7 +44,7 @@ jobs: with: name: rebar3.crashdump path: ./rebar3.crashdump - - name: pakcages test + - name: packages test run: | export CODE_PATH=$GITHUB_WORKSPACE .ci/build_packages/tests.sh @@ -58,7 +59,7 @@ jobs: strategy: matrix: erl_otp: - - 23.2.7.2-emqx-2 + - 24.0.5-emqx-1 steps: - uses: actions/checkout@v1 diff --git a/.github/workflows/check_deps_integrity.yaml b/.github/workflows/check_deps_integrity.yaml index 0aa8f7903..96cd71bf0 100644 --- a/.github/workflows/check_deps_integrity.yaml +++ b/.github/workflows/check_deps_integrity.yaml @@ -4,8 +4,13 @@ on: [pull_request] jobs: check_deps_integrity: + strategy: + matrix: + container: + - "emqx/build-env:erl24.0.5-emqx-1-ubuntu20.04" + runs-on: ubuntu-20.04 - container: emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04 + container: ${{ matrix.container }} steps: - uses: actions/checkout@v2 diff --git a/.github/workflows/run_emqx_app_tests.yaml b/.github/workflows/run_emqx_app_tests.yaml index 56089ffcf..d05d41969 100644 --- a/.github/workflows/run_emqx_app_tests.yaml +++ b/.github/workflows/run_emqx_app_tests.yaml @@ -9,8 +9,14 @@ on: jobs: check_all: + strategy: + matrix: + container: + - "emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04" + - "emqx/build-env:erl24.0.5-emqx-1-ubuntu20.04" + runs-on: ubuntu-20.04 - container: emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04 + container: ${{ matrix.container }} steps: - uses: actions/checkout@v2 diff --git a/.github/workflows/run_fvt_tests.yaml b/.github/workflows/run_fvt_tests.yaml index 108434cb5..272d69ca7 100644 --- a/.github/workflows/run_fvt_tests.yaml +++ b/.github/workflows/run_fvt_tests.yaml @@ -16,7 +16,7 @@ jobs: - uses: gleam-lang/setup-erlang@v1.1.2 id: install_erlang with: - otp-version: 23.2 + otp-version: 24.0.5 - name: prepare run: | if make emqx-ee --dry-run > /dev/null 2>&1; then @@ -59,8 +59,11 @@ jobs: - name: make paho tests run: | if ! docker exec -i python /scripts/pytest.sh; then + echo "DUMP_CONTAINER_LOGS_BGN" + docker logs haproxy docker logs node1.emqx.io docker logs node2.emqx.io + echo "DUMP_CONTAINER_LOGS_END" exit 1 fi @@ -72,7 +75,7 @@ jobs: - uses: gleam-lang/setup-erlang@v1.1.2 id: install_erlang with: - otp-version: 23.2 + otp-version: 24.0.5 - name: prepare run: | if make emqx-ee --dry-run > /dev/null 2>&1; then @@ -183,8 +186,15 @@ jobs: exit $RESULT relup_test: + strategy: + matrix: + container: + - "emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04" + - "emqx/build-env:erl24.0.5-emqx-1-ubuntu20.04" + runs-on: ubuntu-20.04 - container: emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04 + container: ${{ matrix.container }} + defaults: run: shell: bash @@ -295,4 +305,3 @@ jobs: with: name: lux_logs path: lux_logs - diff --git a/.github/workflows/run_test_cases.yaml b/.github/workflows/run_test_cases.yaml index 812058111..c2b30ed3d 100644 --- a/.github/workflows/run_test_cases.yaml +++ b/.github/workflows/run_test_cases.yaml @@ -9,8 +9,14 @@ on: jobs: run_static_analysis: + strategy: + matrix: + container: + - "emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04" + - "emqx/build-env:erl24.0.5-emqx-1-ubuntu20.04" + runs-on: ubuntu-20.04 - container: emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04 + container: ${{ matrix.container }} steps: - uses: actions/checkout@v2 @@ -26,8 +32,14 @@ jobs: run: make dialyzer run_proper_test: + strategy: + matrix: + container: + - "emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04" + - "emqx/build-env:erl24.0.5-emqx-1-ubuntu20.04" + runs-on: ubuntu-20.04 - container: emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04 + container: ${{ matrix.container }} steps: - uses: actions/checkout@v2 @@ -41,6 +53,12 @@ jobs: run: make proper run_common_test: + strategy: + matrix: + otp_release: + - "erlang23" + - "erlang24" + runs-on: ubuntu-20.04 steps: @@ -73,15 +91,15 @@ jobs: up -d --build - name: run eunit run: | - docker exec -i erlang bash -c "make eunit" + docker exec -i ${{ matrix.otp_release }} bash -c "make eunit" - name: run common test run: | - docker exec -i erlang bash -c "make ct" + docker exec -i ${{ matrix.otp_release }} bash -c "make ct" - name: run cover run: | printenv > .env - docker exec -i erlang bash -c "make cover" - docker exec --env-file .env -i erlang bash -c "make coveralls" + docker exec -i ${{ matrix.otp_release }} bash -c "make cover" + docker exec --env-file .env -i ${{ matrix.otp_release }} bash -c "make coveralls" - name: cat rebar.crashdump if: failure() run: if [ -f 'rebar3.crashdump' ];then cat 'rebar3.crashdump'; fi diff --git a/.tool-versions b/.tool-versions index b87853803..3eb01f497 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1 +1 @@ -erlang 24.0.1-emqx-1 +erlang 24.0.5-emqx-1 diff --git a/Makefile b/Makefile index 6f21a6147..dc44d208b 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,7 @@ BUILD = $(CURDIR)/build SCRIPTS = $(CURDIR)/scripts export PKG_VSN ?= $(shell $(CURDIR)/pkg-vsn.sh) export EMQX_DESC ?= EMQ X -export EMQX_DASHBOARD_VERSION ?= v5.0.0-beta.3 +export EMQX_DASHBOARD_VERSION ?= v5.0.0-beta.4 ifeq ($(OS),Windows_NT) export REBAR_COLOR=none endif diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 7b72f12d3..ebe46559b 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -18,7 +18,7 @@ , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.11.0"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} , {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}} - , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.13.0"}}} + , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.14.1"}}} ]}. {plugins, [rebar3_proper]}. diff --git a/apps/emqx/src/emqx_config.erl b/apps/emqx/src/emqx_config.erl index e2d2c8207..151cf8e8e 100644 --- a/apps/emqx/src/emqx_config.erl +++ b/apps/emqx/src/emqx_config.erl @@ -32,6 +32,7 @@ -export([ get/1 , get/2 , find/1 + , find_raw/1 , put/1 , put/2 ]). @@ -60,11 +61,19 @@ , put_raw/2 ]). --define(CONF, fun(ROOT) -> {?MODULE, bin(ROOT)} end). --define(RAW_CONF, fun(ROOT) -> {?MODULE, raw, bin(ROOT)} end). +-define(CONF, conf). +-define(RAW_CONF, raw_conf). +-define(PERSIS_KEY(TYPE, ROOT), {?MODULE, TYPE, ROOT}). -define(ZONE_CONF_PATH(ZONE, PATH), [zones, ZONE | PATH]). -define(LISTENER_CONF_PATH(ZONE, LISTENER, PATH), [zones, ZONE, listeners, LISTENER | PATH]). +-define(ATOM_CONF_PATH(PATH, EXP, EXP_ON_FAIL), + try [atom(Key) || Key <- PATH] of + AtomKeyPath -> EXP + catch + error:badarg -> EXP_ON_FAIL + end). + -export_type([update_request/0, raw_config/0, config/0]). -type update_request() :: term(). %% raw_config() is the config that is NOT parsed and tranlated by hocon schema @@ -93,8 +102,28 @@ get(KeyPath, Default) -> do_get(?CONF, KeyPath, Default). -spec find(emqx_map_lib:config_key_path()) -> {ok, term()} | {not_found, emqx_map_lib:config_key_path(), term()}. +find([]) -> + Ref = make_ref(), + Res = do_get(?CONF, [], Ref), + case Res =:= Ref of + true -> {not_found, []}; + false -> {ok, Res} + end; find(KeyPath) -> - emqx_map_lib:deep_find(KeyPath, get_root(KeyPath)). + ?ATOM_CONF_PATH(KeyPath, emqx_map_lib:deep_find(AtomKeyPath, get_root(KeyPath)), + {not_found, KeyPath}). + +-spec find_raw(emqx_map_lib:config_key_path()) -> + {ok, term()} | {not_found, emqx_map_lib:config_key_path(), term()}. +find_raw([]) -> + Ref = make_ref(), + Res = do_get(?RAW_CONF, [], Ref), + case Res =:= Ref of + true -> {not_found, []}; + false -> {ok, Res} + end; +find_raw(KeyPath) -> + emqx_map_lib:deep_find([bin(Key) || Key <- KeyPath], get_root_raw(KeyPath)). -spec get_zone_conf(atom(), emqx_map_lib:config_key_path()) -> term(). get_zone_conf(Zone, KeyPath) -> @@ -141,20 +170,20 @@ put(KeyPath, Config) -> do_put(?CONF, KeyPath, Config). -spec update(emqx_map_lib:config_key_path(), update_request()) -> ok | {error, term()}. -update(ConfKeyPath, UpdateReq) -> - update(emqx_schema, ConfKeyPath, UpdateReq). +update(KeyPath, UpdateReq) -> + update(emqx_schema, KeyPath, UpdateReq). -spec update(module(), emqx_map_lib:config_key_path(), update_request()) -> ok | {error, term()}. -update(SchemaModule, ConfKeyPath, UpdateReq) -> - emqx_config_handler:update_config(SchemaModule, ConfKeyPath, UpdateReq). +update(SchemaModule, KeyPath, UpdateReq) -> + emqx_config_handler:update_config(SchemaModule, KeyPath, {update, UpdateReq}). -spec remove(emqx_map_lib:config_key_path()) -> ok | {error, term()}. -remove(ConfKeyPath) -> - remove(emqx_schema, ConfKeyPath). +remove(KeyPath) -> + remove(emqx_schema, KeyPath). -remove(SchemaModule, ConfKeyPath) -> - emqx_config_handler:remove_config(SchemaModule, ConfKeyPath). +remove(SchemaModule, KeyPath) -> + emqx_config_handler:update_config(SchemaModule, KeyPath, remove). -spec get_raw(emqx_map_lib:config_key_path()) -> term(). get_raw(KeyPath) -> do_get(?RAW_CONF, KeyPath). @@ -262,24 +291,60 @@ load_hocon_file(FileName, LoadType) -> emqx_override_conf_name() -> application:get_env(emqx, override_conf_file, "emqx_override.conf"). -bin(Bin) when is_binary(Bin) -> Bin; -bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). - -do_get(PtKey, KeyPath) -> +do_get(Type, KeyPath) -> Ref = make_ref(), - Res = do_get(PtKey, KeyPath, Ref), + Res = do_get(Type, KeyPath, Ref), case Res =:= Ref of true -> error({config_not_found, KeyPath}); false -> Res end. -do_get(PtKey, [RootName], Default) -> - persistent_term:get(PtKey(RootName), Default); -do_get(PtKey, [RootName | KeyPath], Default) -> - RootV = persistent_term:get(PtKey(RootName), #{}), - emqx_map_lib:deep_get(KeyPath, RootV, Default). +do_get(Type, [], Default) -> + AllConf = lists:foldl(fun + ({?PERSIS_KEY(Type0, RootName), Conf}, AccIn) when Type0 == Type -> + AccIn#{conf_key(Type0, RootName) => Conf}; + (_, AccIn) -> AccIn + end, #{}, persistent_term:get()), + case map_size(AllConf) == 0 of + true -> Default; + false -> AllConf + end; +do_get(Type, [RootName], Default) -> + persistent_term:get(?PERSIS_KEY(Type, bin(RootName)), Default); +do_get(Type, [RootName | KeyPath], Default) -> + RootV = persistent_term:get(?PERSIS_KEY(Type, bin(RootName)), #{}), + do_deep_get(Type, KeyPath, RootV, Default). -do_put(PtKey, [RootName | KeyPath], DeepValue) -> - OldValue = do_get(PtKey, [RootName], #{}), - NewValue = emqx_map_lib:deep_put(KeyPath, OldValue, DeepValue), - persistent_term:put(PtKey(RootName), NewValue). +do_put(Type, [], DeepValue) -> + maps:fold(fun(RootName, Value, _Res) -> + do_put(Type, [RootName], Value) + end, ok, DeepValue); +do_put(Type, [RootName | KeyPath], DeepValue) -> + OldValue = do_get(Type, [RootName], #{}), + NewValue = do_deep_put(Type, KeyPath, OldValue, DeepValue), + persistent_term:put(?PERSIS_KEY(Type, bin(RootName)), NewValue). + +do_deep_get(?CONF, KeyPath, Map, Default) -> + ?ATOM_CONF_PATH(KeyPath, emqx_map_lib:deep_get(AtomKeyPath, Map, Default), + Default); +do_deep_get(?RAW_CONF, KeyPath, Map, Default) -> + emqx_map_lib:deep_get([bin(Key) || Key <- KeyPath], Map, Default). + +do_deep_put(?CONF, KeyPath, Map, Value) -> + ?ATOM_CONF_PATH(KeyPath, emqx_map_lib:deep_put(AtomKeyPath, Map, Value), + error({not_found, KeyPath})); +do_deep_put(?RAW_CONF, KeyPath, Map, Value) -> + emqx_map_lib:deep_put([bin(Key) || Key <- KeyPath], Map, Value). + +atom(Bin) when is_binary(Bin) -> + binary_to_existing_atom(Bin, latin1); +atom(Atom) when is_atom(Atom) -> + Atom. + +bin(Bin) when is_binary(Bin) -> Bin; +bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). + +conf_key(?CONF, RootName) -> + atom(RootName); +conf_key(?RAW_CONF, RootName) -> + bin(RootName). diff --git a/apps/emqx/src/emqx_config_handler.erl b/apps/emqx/src/emqx_config_handler.erl index 9a830bf4d..e4285b503 100644 --- a/apps/emqx/src/emqx_config_handler.erl +++ b/apps/emqx/src/emqx_config_handler.erl @@ -25,7 +25,6 @@ -export([ start_link/0 , add_handler/2 , update_config/3 - , remove_config/2 , merge_to_old_config/2 ]). @@ -38,10 +37,10 @@ code_change/3]). -define(MOD, {mod}). --define(REMOVE_CONF, '$remove_config'). -type handler_name() :: module(). -type handlers() :: #{emqx_config:config_key() => handlers(), ?MOD => handler_name()}. +-type update_args() :: {update, emqx_config:update_request()} | remove. -optional_callbacks([ pre_config_update/2 , post_config_update/3 @@ -61,15 +60,10 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, {}, []). --spec update_config(module(), emqx_config:config_key_path(), emqx_config:update_request()) -> +-spec update_config(module(), emqx_config:config_key_path(), update_args()) -> ok | {error, term()}. -update_config(SchemaModule, ConfKeyPath, UpdateReq) when UpdateReq =/= ?REMOVE_CONF -> - gen_server:call(?MODULE, {change_config, SchemaModule, ConfKeyPath, UpdateReq}). - --spec remove_config(module(), emqx_config:config_key_path()) -> - ok | {error, term()}. -remove_config(SchemaModule, ConfKeyPath) -> - gen_server:call(?MODULE, {change_config, SchemaModule, ConfKeyPath, ?REMOVE_CONF}). +update_config(SchemaModule, ConfKeyPath, UpdateArgs) -> + gen_server:call(?MODULE, {change_config, SchemaModule, ConfKeyPath, UpdateArgs}). -spec add_handler(emqx_config:config_key_path(), handler_name()) -> ok. add_handler(ConfKeyPath, HandlerName) -> @@ -86,15 +80,15 @@ handle_call({add_child, ConfKeyPath, HandlerName}, _From, {reply, ok, State#{handlers => emqx_map_lib:deep_put(ConfKeyPath, Handlers, #{?MOD => HandlerName})}}; -handle_call({change_config, SchemaModule, ConfKeyPath, UpdateReq}, _From, +handle_call({change_config, SchemaModule, ConfKeyPath, UpdateArgs}, _From, #{handlers := Handlers} = State) -> OldConf = emqx_config:get_root(ConfKeyPath), OldRawConf = emqx_config:get_root_raw(ConfKeyPath), Result = try {NewRawConf, OverrideConf} = process_upadate_request(ConfKeyPath, OldRawConf, - Handlers, UpdateReq), + Handlers, UpdateArgs), {AppEnvs, CheckedConf} = emqx_config:check_config(SchemaModule, NewRawConf), - _ = do_post_config_update(ConfKeyPath, Handlers, OldConf, CheckedConf, UpdateReq), + _ = do_post_config_update(ConfKeyPath, Handlers, OldConf, CheckedConf, UpdateArgs), emqx_config:save_configs(AppEnvs, CheckedConf, NewRawConf, OverrideConf) catch Error:Reason:ST -> ?LOG(error, "change_config failed: ~p", [{Error, Reason, ST}]), @@ -118,12 +112,12 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -process_upadate_request(ConfKeyPath, OldRawConf, _Handlers, ?REMOVE_CONF) -> +process_upadate_request(ConfKeyPath, OldRawConf, _Handlers, remove) -> BinKeyPath = bin_path(ConfKeyPath), NewRawConf = emqx_map_lib:deep_remove(BinKeyPath, OldRawConf), OverrideConf = emqx_map_lib:deep_remove(BinKeyPath, emqx_config:read_override_conf()), {NewRawConf, OverrideConf}; -process_upadate_request(ConfKeyPath, OldRawConf, Handlers, UpdateReq) -> +process_upadate_request(ConfKeyPath, OldRawConf, Handlers, {update, UpdateReq}) -> NewRawConf = do_update_config(ConfKeyPath, Handlers, OldRawConf, UpdateReq), OverrideConf = update_override_config(NewRawConf), {NewRawConf, OverrideConf}. @@ -136,14 +130,14 @@ do_update_config([ConfKey | ConfKeyPath], Handlers, OldRawConf, UpdateReq) -> NewUpdateReq = do_update_config(ConfKeyPath, SubHandlers, SubOldRawConf, UpdateReq), call_pre_config_update(Handlers, OldRawConf, #{bin(ConfKey) => NewUpdateReq}). -do_post_config_update([], Handlers, OldConf, NewConf, UpdateReq) -> - call_post_config_update(Handlers, OldConf, NewConf, UpdateReq); -do_post_config_update([ConfKey | ConfKeyPath], Handlers, OldConf, NewConf, UpdateReq) -> +do_post_config_update([], Handlers, OldConf, NewConf, UpdateArgs) -> + call_post_config_update(Handlers, OldConf, NewConf, up_req(UpdateArgs)); +do_post_config_update([ConfKey | ConfKeyPath], Handlers, OldConf, NewConf, UpdateArgs) -> SubOldConf = get_sub_config(ConfKey, OldConf), SubNewConf = get_sub_config(ConfKey, NewConf), SubHandlers = maps:get(ConfKey, Handlers, #{}), - _ = do_post_config_update(ConfKeyPath, SubHandlers, SubOldConf, SubNewConf, UpdateReq), - call_post_config_update(Handlers, OldConf, NewConf, UpdateReq). + _ = do_post_config_update(ConfKeyPath, SubHandlers, SubOldConf, SubNewConf, UpdateArgs), + call_post_config_update(Handlers, OldConf, NewConf, up_req(UpdateArgs)). get_sub_config(ConfKey, Conf) when is_map(Conf) -> maps:get(ConfKey, Conf, undefined); @@ -178,6 +172,9 @@ update_override_config(RawConf) -> OldConf = emqx_config:read_override_conf(), maps:merge(OldConf, RawConf). +up_req(remove) -> '$remove'; +up_req({update, Req}) -> Req. + bin_path(ConfKeyPath) -> [bin(Key) || Key <- ConfKeyPath]. bin(A) when is_atom(A) -> atom_to_binary(A, utf8); diff --git a/apps/emqx_authz/README.md b/apps/emqx_authz/README.md index 420898c95..bc94578c0 100644 --- a/apps/emqx_authz/README.md +++ b/apps/emqx_authz/README.md @@ -51,7 +51,7 @@ authz:{ cmd: "HGETALL mqtt_authz:%u" }, { - principal: {username: "^admin?"} + principal: {username: "^admin?"} permission: allow action: subscribe topics: ["$SYS/#"] diff --git a/apps/emqx_authz/src/emqx_authz.erl b/apps/emqx_authz/src/emqx_authz.erl index 50c8e5122..5c46e7749 100644 --- a/apps/emqx_authz/src/emqx_authz.erl +++ b/apps/emqx_authz/src/emqx_authz.erl @@ -20,11 +20,17 @@ -include("emqx_authz.hrl"). -include_lib("emqx/include/logger.hrl"). +-ifdef(TEST). +-compile(export_all). +-compile(nowarn_export_all). +-endif. -export([ register_metrics/0 , init/0 , init_rule/1 , lookup/0 + , lookup/1 + , move/2 , update/2 , authorize/5 , match/4 @@ -41,42 +47,160 @@ register_metrics() -> init() -> ok = register_metrics(), emqx_config_handler:add_handler(?CONF_KEY_PATH, ?MODULE), - NRules = [init_rule(Rule) || Rule <- lookup()], + NRules = [init_rule(Rule) || Rule <- emqx_config:get(?CONF_KEY_PATH, [])], ok = emqx_hooks:add('client.authorize', {?MODULE, authorize, [NRules]}, -1). lookup() -> - emqx_config:get(?CONF_KEY_PATH, []). + {_M, _F, [A]}= find_action_in_hooks(), + A. +lookup(Id) -> + try find_rule_by_id(Id, lookup()) of + {_, Rule} -> Rule + catch + error:Reason -> {error, Reason} + end. + +move(Id, Position) -> + emqx_config:update(emqx_authz_schema, ?CONF_KEY_PATH, {move, Id, Position}). update(Cmd, Rules) -> emqx_config:update(emqx_authz_schema, ?CONF_KEY_PATH, {Cmd, Rules}). -%% For now we only support re-creating the entire rule list -pre_config_update({head, Rule}, OldConf) when is_map(Rule), is_list(OldConf) -> - [Rule | OldConf]; -pre_config_update({tail, Rule}, OldConf) when is_map(Rule), is_list(OldConf) -> - OldConf ++ [Rule]; -pre_config_update({_, NewConf}, _OldConf) -> - %% overwrite the entire config! - case is_list(NewConf) of - true -> NewConf; - false -> [NewConf] - end. +pre_config_update({move, Id, <<"top">>}, Conf) when is_list(Conf) -> + {Index, _} = find_rule_by_id(Id), + {List1, List2} = lists:split(Index, Conf), + [lists:nth(Index, Conf)] ++ lists:droplast(List1) ++ List2; -post_config_update(_, undefined, _OldConf) -> - %_ = [release_rules(Rule) || Rule <- OldConf], +pre_config_update({move, Id, <<"bottom">>}, Conf) when is_list(Conf) -> + {Index, _} = find_rule_by_id(Id), + {List1, List2} = lists:split(Index, Conf), + lists:droplast(List1) ++ List2 ++ [lists:nth(Index, Conf)]; + +pre_config_update({move, Id, #{<<"before">> := BeforeId}}, Conf) when is_list(Conf) -> + {Index1, _} = find_rule_by_id(Id), + Conf1 = lists:nth(Index1, Conf), + {Index2, _} = find_rule_by_id(BeforeId), + Conf2 = lists:nth(Index2, Conf), + + {List1, List2} = lists:split(Index2, Conf), + lists:delete(Conf1, lists:droplast(List1)) + ++ [Conf1] ++ [Conf2] + ++ lists:delete(Conf1, List2); + +pre_config_update({move, Id, #{<<"after">> := AfterId}}, Conf) when is_list(Conf) -> + {Index1, _} = find_rule_by_id(Id), + Conf1 = lists:nth(Index1, Conf), + {Index2, _} = find_rule_by_id(AfterId), + + {List1, List2} = lists:split(Index2, Conf), + lists:delete(Conf1, List1) + ++ [Conf1] + ++ lists:delete(Conf1, List2); + +pre_config_update({head, Rules}, Conf) when is_list(Rules), is_list(Conf) -> + Rules ++ Conf; +pre_config_update({tail, Rules}, Conf) when is_list(Rules), is_list(Conf) -> + Conf ++ Rules; +pre_config_update({{replace_once, Id}, Rule}, Conf) when is_map(Rule), is_list(Conf) -> + {Index, _} = find_rule_by_id(Id), + {List1, List2} = lists:split(Index, Conf), + lists:droplast(List1) ++ [Rule] ++ List2; +pre_config_update({_, Rules}, _Conf) when is_list(Rules)-> + %% overwrite the entire config! + Rules. + +post_config_update(_, undefined, _Conf) -> ok; +post_config_update({move, Id, <<"top">>}, _NewRules, _OldRules) -> + InitedRules = lookup(), + {Index, Rule} = find_rule_by_id(Id, InitedRules), + {Rules1, Rules2 } = lists:split(Index, InitedRules), + Rules3 = [Rule] ++ lists:droplast(Rules1) ++ Rules2, + ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [Rules3]}, -1), + ok = emqx_authz_cache:drain_cache(); +post_config_update({move, Id, <<"bottom">>}, _NewRules, _OldRules) -> + InitedRules = lookup(), + {Index, Rule} = find_rule_by_id(Id, InitedRules), + {Rules1, Rules2 } = lists:split(Index, InitedRules), + Rules3 = lists:droplast(Rules1) ++ Rules2 ++ [Rule], + ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [Rules3]}, -1), + ok = emqx_authz_cache:drain_cache(); +post_config_update({move, Id, #{<<"before">> := BeforeId}}, _NewRules, _OldRules) -> + InitedRules = lookup(), + {_, Rule0} = find_rule_by_id(Id, InitedRules), + {Index, Rule1} = find_rule_by_id(BeforeId, InitedRules), + {Rules1, Rules2} = lists:split(Index, InitedRules), + Rules3 = lists:delete(Rule0, lists:droplast(Rules1)) + ++ [Rule0] ++ [Rule1] + ++ lists:delete(Rule0, Rules2), + ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [Rules3]}, -1), + ok = emqx_authz_cache:drain_cache(); + +post_config_update({move, Id, #{<<"after">> := AfterId}}, _NewRules, _OldRules) -> + InitedRules = lookup(), + {_, Rule} = find_rule_by_id(Id, InitedRules), + {Index, _} = find_rule_by_id(AfterId, InitedRules), + {Rules1, Rules2} = lists:split(Index, InitedRules), + Rules3 = lists:delete(Rule, Rules1) + ++ [Rule] + ++ lists:delete(Rule, Rules2), + ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [Rules3]}, -1), + ok = emqx_authz_cache:drain_cache(); + +post_config_update({head, Rules}, _NewRules, _OldConf) -> + InitedRules = [init_rule(R) || R <- check_rules(Rules)], + ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [InitedRules ++ lookup()]}, -1), + ok = emqx_authz_cache:drain_cache(); + +post_config_update({tail, Rules}, _NewRules, _OldConf) -> + InitedRules = [init_rule(R) || R <- check_rules(Rules)], + emqx_hooks:put('client.authorize', {?MODULE, authorize, [lookup() ++ InitedRules]}, -1), + ok = emqx_authz_cache:drain_cache(); + +post_config_update({{replace_once, Id}, Rule}, _NewRules, _OldConf) when is_map(Rule) -> + OldInitedRules = lookup(), + {Index, OldRule} = find_rule_by_id(Id, OldInitedRules), + case maps:get(type, OldRule, undefined) of + undefined -> ok; + _ -> + #{annotations := #{id := Id}} = OldRule, + ok = emqx_resource:remove(Id) + end, + {OldRules1, OldRules2 } = lists:split(Index, OldInitedRules), + InitedRules = [init_rule(R#{annotations => #{id => Id}}) || R <- check_rules([Rule])], + ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [lists:droplast(OldRules1) ++ InitedRules ++ OldRules2]}, -1), + ok = emqx_authz_cache:drain_cache(); + post_config_update(_, NewRules, _OldConf) -> - %_ = [release_rules(Rule) || Rule <- OldConf], + %% overwrite the entire config! + OldInitedRules = lookup(), InitedRules = [init_rule(Rule) || Rule <- NewRules], - Action = find_action_in_hooks(), - ok = emqx_hooks:del('client.authorize', Action), - ok = emqx_hooks:add('client.authorize', {?MODULE, authorize, [InitedRules]}, -1), + ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [InitedRules]}, -1), + lists:foreach(fun (#{type := _Type, enable := true, annotations := #{id := Id}}) -> + ok = emqx_resource:remove(Id); + (_) -> ok + end, OldInitedRules), ok = emqx_authz_cache:drain_cache(). %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- +check_rules(RawRules) -> + {ok, Conf} = hocon:binary(jsx:encode(#{<<"authorization">> => #{<<"rules">> => RawRules}}), #{format => richmap}), + CheckConf = hocon_schema:check(emqx_authz_schema, Conf, #{atom_key => true}), + #{authorization := #{rules := Rules}} = hocon_schema:richmap_to_map(CheckConf), + Rules. + +find_rule_by_id(Id) -> find_rule_by_id(Id, lookup()). +find_rule_by_id(Id, Rules) -> find_rule_by_id(Id, Rules, 1). +find_rule_by_id(_RuleId, [], _N) -> error(not_found_rule); +find_rule_by_id(RuleId, [ Rule = #{annotations := #{id := Id}} | Tail], N) -> + case RuleId =:= Id of + true -> {N, Rule}; + false -> find_rule_by_id(RuleId, Tail, N + 1) + end. + find_action_in_hooks() -> Callbacks = emqx_hooks:lookup('client.authorize'), [Action] = [Action || {callback,{?MODULE, authorize, _} = Action, _, _} <- Callbacks ], @@ -85,6 +209,19 @@ find_action_in_hooks() -> gen_id(Type) -> iolist_to_binary([io_lib:format("~s_~s",[?APP, Type]), "_", integer_to_list(erlang:system_time())]). +create_resource(#{type := DB, + config := Config, + annotations := #{id := ResourceID}}) -> + case emqx_resource:update( + ResourceID, + list_to_existing_atom(io_lib:format("~s_~s",[emqx_connector, DB])), + Config, + []) + of + {ok, _} -> ResourceID; + {error, already_created} -> ResourceID; + {error, Reason} -> {error, Reason} + end; create_resource(#{type := DB, config := Config}) -> ResourceID = gen_id(DB), @@ -102,13 +239,19 @@ create_resource(#{type := DB, init_rule(#{topics := Topics, action := Action, permission := Permission, - principal := Principal - } = Rule) when ?ALLOW_DENY(Permission), ?PUBSUB(Action), is_list(Topics) -> + principal := Principal, + annotations := #{id := Id} + } = Rule) when ?ALLOW_DENY(Permission), ?PUBSUB(Action), is_list(Topics) -> Rule#{annotations => - #{id => gen_id(simple), + #{id => Id, principal => compile_principal(Principal), topics => [compile_topic(Topic) || Topic <- Topics]} }; +init_rule(#{topics := Topics, + action := Action, + permission := Permission + } = Rule) when ?ALLOW_DENY(Permission), ?PUBSUB(Action), is_list(Topics) -> + init_rule(Rule#{annotations =>#{id => gen_id(simple)}}); init_rule(#{principal := Principal, enable := true, @@ -118,7 +261,7 @@ init_rule(#{principal := Principal, NConfig = maps:merge(Config, #{base_url => maps:remove(query, Url)}), case create_resource(Rule#{config := NConfig}) of {error, Reason} -> error({load_config_error, Reason}); - Id -> Rule#{annotations => + Id -> Rule#{annotations => #{id => Id, principal => compile_principal(Principal) } @@ -132,7 +275,7 @@ init_rule(#{principal := Principal, DB =:= mongo -> case create_resource(Rule) of {error, Reason} -> error({load_config_error, Reason}); - Id -> Rule#{annotations => + Id -> Rule#{annotations => #{id => Id, principal => compile_principal(Principal) } @@ -148,7 +291,7 @@ init_rule(#{principal := Principal, Mod = list_to_existing_atom(io_lib:format("~s_~s",[?APP, DB])), case create_resource(Rule) of {error, Reason} -> error({load_config_error, Reason}); - Id -> Rule#{annotations => + Id -> Rule#{annotations => #{id => Id, principal => compile_principal(Principal), sql => Mod:parse_query(SQL) diff --git a/apps/emqx_authz/src/emqx_authz_api.erl b/apps/emqx_authz/src/emqx_authz_api.erl index 974f72dbe..e6d1732a6 100644 --- a/apps/emqx_authz/src/emqx_authz_api.erl +++ b/apps/emqx_authz/src/emqx_authz_api.erl @@ -16,74 +16,502 @@ -module(emqx_authz_api). +-behavior(minirest_api). + -include("emqx_authz.hrl"). --rest_api(#{name => lookup_authz, - method => 'GET', - path => "/authz", - func => lookup_authz, - descr => "Lookup Authorization" - }). +-define(EXAMPLE_RETURNED_RULE1, + #{principal => <<"all">>, + permission => <<"allow">>, + action => <<"all">>, + topics => [<<"#">>], + annotations => #{id => 1} + }). --rest_api(#{name => update_authz, - method => 'PUT', - path => "/authz", - func => update_authz, - descr => "Rewrite authz list" - }). --rest_api(#{name => append_authz, - method => 'POST', - path => "/authz/append", - func => append_authz, - descr => "Add a new rule at the end of the authz list" - }). +-define(EXAMPLE_RETURNED_RULES, + #{rules => [?EXAMPLE_RETURNED_RULE1 + ] + }). --rest_api(#{name => push_authz, - method => 'POST', - path => "/authz/push", - func => push_authz, - descr => "Add a new rule at the start of the authz list" - }). +-define(EXAMPLE_RULE1, #{principal => <<"all">>, + permission => <<"allow">>, + action => <<"all">>, + topics => [<<"#">>]}). --export([ lookup_authz/2 - , update_authz/2 - , append_authz/2 - , push_authz/2 +-export([ api_spec/0 + , rules/2 + , rule/2 + , move_rule/2 ]). -lookup_authz(_Bindings, _Params) -> - return({ok, emqx_authz:lookup()}). +api_spec() -> + {[ rules_api() + , rule_api() + , move_rule_api() + ], definitions()}. -update_authz(_Bindings, Params) -> - Rules = form_rules(Params), - return(emqx_authz:update(replace, Rules)). +definitions() -> emqx_authz_api_schema:definitions(). -append_authz(_Bindings, Params) -> - Rules = form_rules(Params), - return(emqx_authz:update(tail, Rules)). +rules_api() -> + Metadata = #{ + get => #{ + description => "List authorization rules", + parameters => [ + #{ + name => page, + in => query, + schema => #{ + type => integer + }, + required => false + }, + #{ + name => limit, + in => query, + schema => #{ + type => integer + }, + required => false + } + ], + responses => #{ + <<"200">> => #{ + description => <<"OK">>, + content => #{ + 'application/json' => #{ + schema => #{ + type => object, + required => [rules], + properties => #{rules => #{ + type => array, + items => minirest:ref(<<"returned_rules">>) + } + } + }, + examples => #{ + rules => #{ + summary => <<"Rules">>, + value => jsx:encode(?EXAMPLE_RETURNED_RULES) + } + } + } + } + } + } + }, + post => #{ + description => "Add new rule", + requestBody => #{ + content => #{ + 'application/json' => #{ + schema => minirest:ref(<<"rules">>), + examples => #{ + simple_rule => #{ + summary => <<"Rules">>, + value => jsx:encode(?EXAMPLE_RULE1) + } + } + } + } + }, + responses => #{ + <<"204">> => #{description => <<"Created">>}, + <<"400">> => #{ + description => <<"Bad Request">>, + content => #{ + 'application/json' => #{ + schema => minirest:ref(<<"error">>), + examples => #{ + example1 => #{ + summary => <<"Bad Request">>, + value => #{ + code => <<"BAD_REQUEST">>, + message => <<"Bad Request">> + } + } + } + } + } + } + } + }, + put => #{ -push_authz(_Bindings, Params) -> - Rules = form_rules(Params), - return(emqx_authz:update(head, Rules)). + description => "Update all rules", + requestBody => #{ + content => #{ + 'application/json' => #{ + schema => #{ + type => array, + items => minirest:ref(<<"returned_rules">>) + }, + examples => #{ + rules => #{ + summary => <<"Rules">>, + value => jsx:encode([?EXAMPLE_RULE1]) + } + } + } + } + }, + responses => #{ + <<"204">> => #{description => <<"Created">>}, + <<"400">> => #{ + description => <<"Bad Request">>, + content => #{ + 'application/json' => #{ + schema => minirest:ref(<<"error">>), + examples => #{ + example1 => #{ + summary => <<"Bad Request">>, + value => #{ + code => <<"BAD_REQUEST">>, + message => <<"Bad Request">> + } + } + } + } + } + } + } + } + }, + {"/authorization", Metadata, rules}. -%%------------------------------------------------------------------------------ -%% Interval Funcs -%%------------------------------------------------------------------------------ +rule_api() -> + Metadata = #{ + get => #{ + description => "List authorization rules", + parameters => [ + #{ + name => id, + in => path, + schema => #{ + type => string + }, + required => true + } + ], + responses => #{ + <<"200">> => #{ + description => <<"OK">>, + content => #{ + 'application/json' => #{ + schema => minirest:ref(<<"returned_rules">>), + examples => #{ + rules => #{ + summary => <<"Rules">>, + value => jsx:encode(?EXAMPLE_RETURNED_RULE1) + } + } + } + } + }, + <<"404">> => #{ + description => <<"Bad Request">>, + content => #{ + 'application/json' => #{ + schema => minirest:ref(<<"error">>), + examples => #{ + example1 => #{ + summary => <<"Not Found">>, + value => #{ + code => <<"NOT_FOUND">>, + message => <<"rule xxx not found">> + } + } + } + } + } + } + } + }, + put => #{ + description => "Update rule", + parameters => [ + #{ + name => id, + in => path, + schema => #{ + type => string + }, + required => true + } + ], + requestBody => #{ + content => #{ + 'application/json' => #{ + schema => minirest:ref(<<"rules">>), + examples => #{ + simple_rule => #{ + summary => <<"Rules">>, + value => jsx:encode(?EXAMPLE_RULE1) + } + } + } + } + }, + responses => #{ + <<"204">> => #{description => <<"No Content">>}, + <<"404">> => #{ + description => <<"Bad Request">>, + content => #{ + 'application/json' => #{ + schema => minirest:ref(<<"error">>), + examples => #{ + example1 => #{ + summary => <<"Not Found">>, + value => #{ + code => <<"NOT_FOUND">>, + message => <<"rule xxx not found">> + } + } + } + } + } + }, + <<"400">> => #{ + description => <<"Bad Request">>, + content => #{ + 'application/json' => #{ + schema => minirest:ref(<<"error">>), + examples => #{ + example1 => #{ + summary => <<"Bad Request">>, + value => #{ + code => <<"BAD_REQUEST">>, + message => <<"Bad Request">> + } + } + } + } + } + } + } + }, + delete => #{ + description => "Delete rule", + parameters => [ + #{ + name => id, + in => path, + schema => #{ + type => string + }, + required => true + } + ], + responses => #{ + <<"204">> => #{description => <<"No Content">>}, + <<"400">> => #{ + description => <<"Bad Request">>, + content => #{ + 'application/json' => #{ + schema => minirest:ref(<<"error">>), + examples => #{ + example1 => #{ + summary => <<"Bad Request">>, + value => #{ + code => <<"BAD_REQUEST">>, + message => <<"Bad Request">> + } + } + } + } + } + } + } + } + }, + {"/authorization/:id", Metadata, rule}. -form_rules(Params) -> - Params. +move_rule_api() -> + Metadata = #{ + post => #{ + description => "Change the order of rules", + parameters => [ + #{ + name => id, + in => path, + schema => #{ + type => string + }, + required => true + } + ], + requestBody => #{ + content => #{ + 'application/json' => #{ + schema => #{ + type => object, + required => [position], + properties => #{ + position => #{ + oneOf => [ + #{type => string, + enum => [<<"top">>, <<"bottom">>] + }, + #{type => object, + required => ['after'], + properties => #{ + 'after' => #{ + type => string + } + } + }, + #{type => object, + required => ['before'], + properties => #{ + 'before' => #{ + type => string + } + } + } + ] + } + } + } + } + } + }, + responses => #{ + <<"204">> => #{ + description => <<"No Content">> + }, + <<"404">> => #{ + description => <<"Bad Request">>, + content => #{ + 'application/json' => #{ + schema => minirest:ref(<<"error">>), + examples => #{ + example1 => #{ + summary => <<"Not Found">>, + value => #{ + code => <<"NOT_FOUND">>, + message => <<"rule xxx not found">> + } + } + } + } + } + }, + <<"400">> => #{ + description => <<"Bad Request">>, + content => #{ + 'application/json' => #{ + schema => minirest:ref(<<"error">>), + examples => #{ + example1 => #{ + summary => <<"Bad Request">>, + value => #{ + code => <<"BAD_REQUEST">>, + message => <<"Bad Request">> + } + } + } + } + } + } + } + } + }, + {"/authorization/:id/move", Metadata, move_rule}. -%%-------------------------------------------------------------------- -%% EUnits -%%-------------------------------------------------------------------- +rules(get, Request) -> + Rules = lists:foldl(fun (#{type := _Type, enable := true, annotations := #{id := Id} = Annotations} = Rule, AccIn) -> + NRule = case emqx_resource:health_check(Id) of + ok -> + Rule#{annotations => Annotations#{status => healthy}}; + _ -> + Rule#{annotations => Annotations#{status => unhealthy}} + end, + lists:append(AccIn, [NRule]); + (Rule, AccIn) -> + lists:append(AccIn, [Rule]) + end, [], emqx_authz:lookup()), + Query = cowboy_req:parse_qs(Request), + case lists:keymember(<<"page">>, 1, Query) andalso lists:keymember(<<"limit">>, 1, Query) of + true -> + {<<"page">>, Page} = lists:keyfind(<<"page">>, 1, Query), + {<<"limit">>, Limit} = lists:keyfind(<<"limit">>, 1, Query), + Index = (binary_to_integer(Page) - 1) * binary_to_integer(Limit), + {_, Rules1} = lists:split(Index, Rules), + case binary_to_integer(Limit) < length(Rules1) of + true -> + {Rules2, _} = lists:split(binary_to_integer(Limit), Rules1), + {200, #{rules => Rules2}}; + false -> {200, #{rules => Rules1}} + end; + false -> {200, #{rules => Rules}} + end; +rules(post, Request) -> + {ok, Body, _} = cowboy_req:read_body(Request), + RawConfig = jsx:decode(Body, [return_maps]), + case emqx_authz:update(head, [RawConfig]) of + ok -> {204}; + {error, Reason} -> + {400, #{code => <<"BAD_REQUEST">>, + messgae => atom_to_binary(Reason)}} + end; +rules(put, Request) -> + {ok, Body, _} = cowboy_req:read_body(Request), + RawConfig = jsx:decode(Body, [return_maps]), + case emqx_authz:update(replace, RawConfig) of + ok -> {204}; + {error, Reason} -> + {400, #{code => <<"BAD_REQUEST">>, + messgae => atom_to_binary(Reason)}} + end. --ifdef(TEST). --include_lib("eunit/include/eunit.hrl"). +rule(get, Request) -> + Id = cowboy_req:binding(id, Request), + case emqx_authz:lookup(Id) of + {error, Reason} -> {404, #{messgae => atom_to_binary(Reason)}}; + Rule -> + case maps:get(type, Rule, undefined) of + undefined -> {200, Rule}; + _ -> + case emqx_resource:health_check(Id) of + ok -> + {200, Rule#{annotations => #{status => healthy}}}; + _ -> + {200, Rule#{annotations => #{status => unhealthy}}} + end - --endif. - -return(_) -> -%% TODO: V5 api - ok. \ No newline at end of file + end + end; +rule(put, Request) -> + RuleId = cowboy_req:binding(id, Request), + {ok, Body, _} = cowboy_req:read_body(Request), + RawConfig = jsx:decode(Body, [return_maps]), + case emqx_authz:update({replace_once, RuleId}, RawConfig) of + ok -> {204}; + {error, not_found_rule} -> + {404, #{code => <<"NOT_FOUND">>, + messgae => <<"rule ", RuleId/binary, " not found">>}}; + {error, Reason} -> + {400, #{code => <<"BAD_REQUEST">>, + messgae => atom_to_binary(Reason)}} + end; +rule(delete, Request) -> + RuleId = cowboy_req:binding(id, Request), + case emqx_authz:update({replace_once, RuleId}, #{}) of + ok -> {204}; + {error, Reason} -> + {400, #{code => <<"BAD_REQUEST">>, + messgae => atom_to_binary(Reason)}} + end. +move_rule(post, Request) -> + RuleId = cowboy_req:binding(id, Request), + {ok, Body, _} = cowboy_req:read_body(Request), + #{<<"position">> := Position} = jsx:decode(Body, [return_maps]), + case emqx_authz:move(RuleId, Position) of + ok -> {204}; + {error, not_found_rule} -> + {404, #{code => <<"NOT_FOUND">>, + messgae => <<"rule ", RuleId/binary, " not found">>}}; + {error, Reason} -> + {400, #{code => <<"BAD_REQUEST">>, + messgae => atom_to_binary(Reason)}} + end. diff --git a/apps/emqx_authz/src/emqx_authz_api_schema.erl b/apps/emqx_authz/src/emqx_authz_api_schema.erl new file mode 100644 index 000000000..2dcc7c564 --- /dev/null +++ b/apps/emqx_authz/src/emqx_authz_api_schema.erl @@ -0,0 +1,157 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_authz_api_schema). + +-export([definitions/0]). + +definitions() -> + RetruenedRules = #{ + allOf => [ #{type => object, + properties => #{ + annotations => #{ + type => object, + required => [id], + properties => #{ + id => #{ + type => string + }, + principal => minirest:ref(<<"principal">>) + } + + } + + } + } + , minirest:ref(<<"rules">>) + ] + }, + Rules = #{ + oneOf => [ minirest:ref(<<"simple_rule">>) + % , minirest:ref(<<"connector_redis">>) + ] + }, + % ConnectorRedis = #{ + % type => object, + % required => [principal, type, enable, config, cmd] + % properties => #{ + % principal => minirest:ref(<<"principal">>), + % type => #{ + % type => string, + % enum => [<<"redis">>], + % example => <<"redis">> + % }, + % enable => #{ + % type => boolean, + % example => true + % } + % config => #{ + % type => + % } + % } + % } + SimpleRule = #{ + type => object, + required => [principal, permission, action, topics], + properties => #{ + action => #{ + type => string, + enum => [<<"publish">>, <<"subscribe">>, <<"all">>], + example => <<"publish">> + }, + permission => #{ + type => string, + enum => [<<"allow">>, <<"deny">>], + example => <<"allow">> + }, + topics => #{ + type => array, + items => #{ + oneOf => [ #{type => string, example => <<"#">>} + , #{type => object, + required => [eq], + properties => #{ + eq => #{type => string} + }, + example => #{eq => <<"#">>} + } + ] + } + }, + principal => minirest:ref(<<"principal">>) + } + }, + Principal = #{ + oneOf => [ minirest:ref(<<"principal_username">>) + , minirest:ref(<<"principal_clientid">>) + , minirest:ref(<<"principal_ipaddress">>) + , #{type => string, enum=>[<<"all">>], example => <<"all">>} + , #{type => object, + required => ['and'], + properties => #{'and' => #{type => array, + items => #{oneOf => [ minirest:ref(<<"principal_username">>) + , minirest:ref(<<"principal_clientid">>) + , minirest:ref(<<"principal_ipaddress">>) + ]}}}, + example => #{'and' => [#{username => <<"emqx">>}, #{clientid => <<"emqx">>}]} + } + , #{type => object, + required => ['or'], + properties => #{'and' => #{type => array, + items => #{oneOf => [ minirest:ref(<<"principal_username">>) + , minirest:ref(<<"principal_clientid">>) + , minirest:ref(<<"principal_ipaddress">>) + ]}}}, + example => #{'or' => [#{username => <<"emqx">>}, #{clientid => <<"emqx">>}]} + } + ] + }, + PrincipalUsername = #{type => object, + required => [username], + properties => #{username => #{type => string}}, + example => #{username => <<"emqx">>} + }, + PrincipalClientid = #{type => object, + required => [clientid], + properties => #{clientid => #{type => string}}, + example => #{clientid => <<"emqx">>} + }, + PrincipalIpaddress = #{type => object, + required => [ipaddress], + properties => #{ipaddress => #{type => string}}, + example => #{ipaddress => <<"127.0.0.1">>} + }, + ErrorDef = #{ + type => object, + properties => #{ + code => #{ + type => string, + example => <<"BAD_REQUEST">> + }, + message => #{ + type => string + } + } + }, + [ #{<<"returned_rules">> => RetruenedRules} + , #{<<"rules">> => Rules} + , #{<<"simple_rule">> => SimpleRule} + , #{<<"principal">> => Principal} + , #{<<"principal_username">> => PrincipalUsername} + , #{<<"principal_clientid">> => PrincipalClientid} + , #{<<"principal_ipaddress">> => PrincipalIpaddress} + , #{<<"error">> => ErrorDef} + ]. diff --git a/apps/emqx_authz/test/emqx_authz_SUITE.erl b/apps/emqx_authz/test/emqx_authz_SUITE.erl index dd3f38519..6f88fe865 100644 --- a/apps/emqx_authz/test/emqx_authz_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_SUITE.erl @@ -22,6 +22,8 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-define(CONF_DEFAULT, <<"authorization: {rules: []}">>). + all() -> emqx_ct:all(?MODULE). @@ -29,81 +31,144 @@ groups() -> []. init_per_suite(Config) -> + ok = emqx_config:init_load(emqx_authz_schema, ?CONF_DEFAULT), ok = emqx_ct_helpers:start_apps([emqx_authz]), ok = emqx_config:update([zones, default, authorization, cache, enable], false), ok = emqx_config:update([zones, default, authorization, enable], true), - emqx_authz:update(replace, []), Config. end_per_suite(_Config) -> - emqx_ct_helpers:stop_apps([emqx_authz]). + ok = emqx_authz:update(replace, []), + emqx_ct_helpers:stop_apps([emqx_authz]), + ok. --define(RULE1, #{principal => all, - topics => [<<"#">>], - action => all, - permission => deny} +init_per_testcase(_, Config) -> + ok = emqx_authz:update(replace, []), + Config. + +-define(RULE1, #{<<"principal">> => <<"all">>, + <<"topics">> => [<<"#">>], + <<"action">> => <<"all">>, + <<"permission">> => <<"deny">>} ). --define(RULE2, #{principal => - #{ipaddress => <<"127.0.0.1">>}, - topics => - [#{eq => <<"#">>}, - #{eq => <<"+">>} +-define(RULE2, #{<<"principal">> => + #{<<"ipaddress">> => <<"127.0.0.1">>}, + <<"topics">> => + [#{<<"eq">> => <<"#">>}, + #{<<"eq">> => <<"+">>} ] , - action => all, - permission => allow} + <<"action">> => <<"all">>, + <<"permission">> => <<"allow">>} ). --define(RULE3,#{principal => - #{'and' => [#{username => "^test?"}, - #{clientid => "^test?"} +-define(RULE3,#{<<"principal">> => + #{<<"and">> => [#{<<"username">> => <<"^test?">>}, + #{<<"clientid">> => <<"^test?">>} ]}, - topics => [<<"test">>], - action => publish, - permission => allow} + <<"topics">> => [<<"test">>], + <<"action">> => <<"publish">>, + <<"permission">> => <<"allow">>} ). --define(RULE4,#{principal => - #{'or' => [#{username => <<"^test">>}, - #{clientid => <<"test?">>} - ]}, - topics => [<<"%u">>,<<"%c">>], - action => publish, - permission => deny} +-define(RULE4,#{<<"principal">> => + #{<<"or">> => [#{<<"username">> => <<"^test">>}, + #{<<"clientid">> => <<"test?">>} + ]}, + <<"topics">> => [<<"%u">>,<<"%c">>], + <<"action">> => <<"publish">>, + <<"permission">> => <<"deny">>} ). - %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ -t_init_rule(_) -> - ?assertMatch(#{annotations := #{id := _ID, - principal := all, - topics := [['#']]} - }, emqx_authz:init_rule(?RULE1)), - ?assertMatch(#{annotations := #{principal := - #{ipaddress := {{127,0,0,1},{127,0,0,1},32}}, - topics := [#{eq := ['#']}, - #{eq := ['+']}], - id := _ID} - }, emqx_authz:init_rule(?RULE2)), - ?assertMatch(#{annotations := - #{principal := + +t_update_rule(_) -> + ok = emqx_authz:update(replace, [?RULE2]), + ok = emqx_authz:update(head, [?RULE1]), + ok = emqx_authz:update(tail, [?RULE3]), + + Lists1 = emqx_authz:check_rules([?RULE1, ?RULE2, ?RULE3]), + ?assertMatch(Lists1, emqx_config:get([authorization, rules], [])), + + [#{annotations := #{id := Id1, + principal := all, + topics := [['#']]} + }, + #{annotations := #{id := Id2, + principal := #{ipaddress := {{127,0,0,1},{127,0,0,1},32}}, + topics := [#{eq := ['#']}, #{eq := ['+']}]} + }, + #{annotations := #{id := Id3, + principal := #{'and' := [#{username := {re_pattern, _, _, _, _}}, #{clientid := {re_pattern, _, _, _, _}} ] }, - topics := [[<<"test">>]], - id := _ID} - }, emqx_authz:init_rule(?RULE3)), - ?assertMatch(#{annotations := - #{principal := - #{'or' := [#{username := {re_pattern, _, _, _, _}}, - #{clientid := {re_pattern, _, _, _, _}} - ] - }, - topics := [#{pattern := [<<"%u">>]}, - #{pattern := [<<"%c">>]} - ], - id := _ID} - }, emqx_authz:init_rule(?RULE4)), + topics := [[<<"test">>]]} + } + ] = emqx_authz:lookup(), + + ok = emqx_authz:update({replace_once, Id3}, ?RULE4), + Lists2 = emqx_authz:check_rules([?RULE1, ?RULE2, ?RULE4]), + ?assertMatch(Lists2, emqx_config:get([authorization, rules], [])), + + [#{annotations := #{id := Id1, + principal := all, + topics := [['#']]} + }, + #{annotations := #{id := Id2, + principal := #{ipaddress := {{127,0,0,1},{127,0,0,1},32}}, + topics := [#{eq := ['#']}, + #{eq := ['+']}]} + }, + #{annotations := #{id := Id3, + principal := + #{'or' := [#{username := {re_pattern, _, _, _, _}}, + #{clientid := {re_pattern, _, _, _, _}} + ] + }, + topics := [#{pattern := [<<"%u">>]}, + #{pattern := [<<"%c">>]} + ]} + } + ] = emqx_authz:lookup(), + + ok = emqx_authz:update(replace, []). + +t_move_rule(_) -> + ok = emqx_authz:update(replace, [?RULE1, ?RULE2, ?RULE3, ?RULE4]), + [#{annotations := #{id := Id1}}, + #{annotations := #{id := Id2}}, + #{annotations := #{id := Id3}}, + #{annotations := #{id := Id4}} + ] = emqx_authz:lookup(), + + ok = emqx_authz:move(Id4, <<"top">>), + ?assertMatch([#{annotations := #{id := Id4}}, + #{annotations := #{id := Id1}}, + #{annotations := #{id := Id2}}, + #{annotations := #{id := Id3}} + ], emqx_authz:lookup()), + + ok = emqx_authz:move(Id1, <<"bottom">>), + ?assertMatch([#{annotations := #{id := Id4}}, + #{annotations := #{id := Id2}}, + #{annotations := #{id := Id3}}, + #{annotations := #{id := Id1}} + ], emqx_authz:lookup()), + + ok = emqx_authz:move(Id3, #{<<"before">> => Id4}), + ?assertMatch([#{annotations := #{id := Id3}}, + #{annotations := #{id := Id4}}, + #{annotations := #{id := Id2}}, + #{annotations := #{id := Id1}} + ], emqx_authz:lookup()), + + ok = emqx_authz:move(Id2, #{<<"after">> => Id1}), + ?assertMatch([#{annotations := #{id := Id3}}, + #{annotations := #{id := Id4}}, + #{annotations := #{id := Id1}}, + #{annotations := #{id := Id2}} + ], emqx_authz:lookup()), ok. t_authz(_) -> @@ -132,10 +197,10 @@ t_authz(_) -> listener => mqtt_tcp }, - Rules1 = [emqx_authz:init_rule(Rule) || Rule <- [?RULE1, ?RULE2]], - Rules2 = [emqx_authz:init_rule(Rule) || Rule <- [?RULE2, ?RULE1]], - Rules3 = [emqx_authz:init_rule(Rule) || Rule <- [?RULE3, ?RULE4]], - Rules4 = [emqx_authz:init_rule(Rule) || Rule <- [?RULE4, ?RULE1]], + Rules1 = [emqx_authz:init_rule(Rule) || Rule <- emqx_authz:check_rules([?RULE1, ?RULE2])], + Rules2 = [emqx_authz:init_rule(Rule) || Rule <- emqx_authz:check_rules([?RULE2, ?RULE1])], + Rules3 = [emqx_authz:init_rule(Rule) || Rule <- emqx_authz:check_rules([?RULE3, ?RULE4])], + Rules4 = [emqx_authz:init_rule(Rule) || Rule <- emqx_authz:check_rules([?RULE4, ?RULE1])], ?assertEqual({stop, deny}, emqx_authz:authorize(ClientInfo1, subscribe, <<"#">>, deny, [])), diff --git a/apps/emqx_authz/test/emqx_authz_api_SUITE.erl b/apps/emqx_authz/test/emqx_authz_api_SUITE.erl index 0bb1b2132..3ed55e6e1 100644 --- a/apps/emqx_authz/test/emqx_authz_api_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_api_SUITE.erl @@ -18,143 +18,207 @@ -compile(nowarn_export_all). -compile(export_all). -% -include("emqx_authz.hrl"). -% -include_lib("eunit/include/eunit.hrl"). -% -include_lib("common_test/include/ct.hrl"). +-include("emqx_authz.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). -% -import(emqx_ct_http, [ request_api/3 -% , request_api/5 -% , get_http_data/1 -% , create_default_app/0 -% , delete_default_app/0 -% , default_auth_header/0 -% ]). +-import(emqx_ct_http, [ request_api/3 + , request_api/5 + , get_http_data/1 + , create_default_app/0 + , delete_default_app/0 + , default_auth_header/0 + , auth_header/2 + ]). -% -define(HOST, "http://127.0.0.1:8081/"). -% -define(API_VERSION, "v4"). -% -define(BASE_PATH, "api"). +-define(HOST, "http://127.0.0.1:8081/"). +-define(API_VERSION, "v5"). +-define(BASE_PATH, "api"). --define(CONF_DEFAULT, <<""" -authorization:{ - rules: [ - ] -} -""">>). +-define(RULE1, #{<<"principal">> => <<"all">>, + <<"topics">> => [<<"#">>], + <<"action">> => <<"all">>, + <<"permission">> => <<"deny">>} + ). +-define(RULE2, #{<<"principal">> => + #{<<"ipaddress">> => <<"127.0.0.1">>}, + <<"topics">> => + [#{<<"eq">> => <<"#">>}, + #{<<"eq">> => <<"+">>} + ] , + <<"action">> => <<"all">>, + <<"permission">> => <<"allow">>} + ). +-define(RULE3,#{<<"principal">> => + #{<<"and">> => [#{<<"username">> => <<"^test?">>}, + #{<<"clientid">> => <<"^test?">>} + ]}, + <<"topics">> => [<<"test">>], + <<"action">> => <<"publish">>, + <<"permission">> => <<"allow">>} + ). +-define(RULE4,#{<<"principal">> => + #{<<"or">> => [#{<<"username">> => <<"^test">>}, + #{<<"clientid">> => <<"test?">>} + ]}, + <<"topics">> => [<<"%u">>,<<"%c">>], + <<"action">> => <<"publish">>, + <<"permission">> => <<"deny">>} + ). all() -> -%% TODO: V5 API -%% emqx_ct:all(?MODULE). - [t_api_unit_test]. + emqx_ct:all(?MODULE). groups() -> []. init_per_suite(Config) -> - ok = emqx_config:init_load(emqx_authz_schema, ?CONF_DEFAULT), - ok = emqx_ct_helpers:start_apps([emqx_authz]), + ekka_mnesia:start(), + emqx_mgmt_auth:mnesia(boot), + ok = emqx_ct_helpers:start_apps([emqx_management, emqx_authz], fun set_special_configs/1), + ok = emqx_config:update([zones, default, authorization, cache, enable], false), + ok = emqx_config:update([zones, default, authorization, enable], true), - %create_default_app(), Config. end_per_suite(_Config) -> ok = emqx_authz:update(replace, []), - emqx_ct_helpers:stop_apps([emqx_authz]), + emqx_ct_helpers:stop_apps([emqx_authz, emqx_management]), ok. -% set_special_configs(emqx) -> -% application:set_env(emqx, allow_anonymous, true), -% application:set_env(emqx, enable_authz_cache, false), -% ok; -% set_special_configs(emqx_authz) -> -% emqx_config:put([emqx_authz], #{rules => []}), -% ok; +set_special_configs(emqx_management) -> + emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}], + applications =>[#{id => "admin", secret => "public"}]}), + ok; +set_special_configs(emqx_authz) -> + emqx_config:put([authorization], #{rules => []}), + ok; +set_special_configs(_App) -> + ok. -% set_special_configs(emqx_management) -> -% emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}], -% applications =>[#{id => "admin", secret => "public"}]}), -% ok; +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ -% set_special_configs(_App) -> -% ok. +t_api(_) -> + {ok, 200, Result1} = request(get, uri(["authorization"]), []), + ?assertEqual([], get_rules(Result1)), -% %%------------------------------------------------------------------------------ -% %% Testcases -% %%------------------------------------------------------------------------------ + lists:foreach(fun(_) -> + {ok, 204, _} = request(post, uri(["authorization"]), + #{<<"action">> => <<"all">>, + <<"permission">> => <<"deny">>, + <<"principal">> => <<"all">>, + <<"topics">> => [<<"#">>]} + ) + end, lists:seq(1, 20)), + {ok, 200, Result2} = request(get, uri(["authorization"]), []), + ?assertEqual(20, length(get_rules(Result2))), -t_api_unit_test(_Config) -> - %% TODO: Decode from JSON or HOCON, instead of hand-crafting decode result - Rule1 = #{<<"principal">> => - #{<<"and">> => [#{<<"username">> => <<"^test?">>}, - #{<<"clientid">> => <<"^test?">>} - ]}, - <<"action">> => <<"subscribe">>, - <<"topics">> => [<<"%u">>], - <<"permission">> => <<"allow">> - }, - ok = emqx_authz_api:push_authz(#{}, Rule1), - [#{action := subscribe, - permission := allow, - principal := - #{'and' := [#{username := <<"^test?">>}, - #{clientid := <<"^test?">>}]}, - topics := [<<"%u">>]}] = emqx_config:get([authorization, rules]). + lists:foreach(fun(Page) -> + Query = "?page=" ++ integer_to_list(Page) ++ "&&limit=10", + Url = uri(["authorization" ++ Query]), + {ok, 200, Result} = request(get, Url, []), + ?assertEqual(10, length(get_rules(Result))) + end, lists:seq(1, 2)), -% t_api(_Config) -> -% Rule1 = #{<<"principal">> => -% #{<<"and">> => [#{<<"username">> => <<"^test?">>}, -% #{<<"clientid">> => <<"^test?">>} -% ]}, -% <<"action">> => <<"subscribe">>, -% <<"topics">> => [<<"%u">>], -% <<"permission">> => <<"allow">> -% }, -% {ok, _} = request_http_rest_add(["authz/push"], #{rules => [Rule1]}), -% {ok, Result1} = request_http_rest_lookup(["authz"]), -% ?assertMatch([Rule1 | _ ], get_http_data(Result1)), + {ok, 204, _} = request(put, uri(["authorization"]), + [ #{<<"action">> => <<"all">>, <<"permission">> => <<"allow">>, <<"principal">> => <<"all">>, <<"topics">> => [<<"#">>]} + , #{<<"action">> => <<"all">>, <<"permission">> => <<"allow">>, <<"principal">> => <<"all">>, <<"topics">> => [<<"#">>]} + , #{<<"action">> => <<"all">>, <<"permission">> => <<"allow">>, <<"principal">> => <<"all">>, <<"topics">> => [<<"#">>]} + ]), -% Rule2 = #{<<"principal">> => #{<<"ipaddress">> => <<"127.0.0.1">>}, -% <<"action">> => <<"publish">>, -% <<"topics">> => [#{<<"eq">> => <<"#">>}, -% #{<<"eq">> => <<"+">>} -% ], -% <<"permission">> => <<"deny">> -% }, -% {ok, _} = request_http_rest_add(["authz/append"], #{rules => [Rule2]}), -% {ok, Result2} = request_http_rest_lookup(["authz"]), -% ?assertEqual(Rule2#{<<"principal">> => #{<<"ipaddress">> => "127.0.0.1"}}, -% lists:last(get_http_data(Result2))), + {ok, 200, Result3} = request(get, uri(["authorization"]), []), + Rules = get_rules(Result3), + ?assertEqual(3, length(Rules)), -% {ok, _} = request_http_rest_update(["authz"], #{rules => []}), -% {ok, Result3} = request_http_rest_lookup(["authz"]), -% ?assertEqual([], get_http_data(Result3)), -% ok. + lists:foreach(fun(#{<<"permission">> := Allow}) -> + ?assertEqual(<<"allow">>, Allow) + end, Rules), -% %%-------------------------------------------------------------------- -% %% HTTP Request -% %%-------------------------------------------------------------------- + #{<<"annotations">> := #{<<"id">> := Id}} = lists:nth(2, Rules), -% request_http_rest_list(Path) -> -% request_api(get, uri(Path), default_auth_header()). + {ok, 204, _} = request(put, uri(["authorization", binary_to_list(Id)]), + #{<<"action">> => <<"all">>, <<"permission">> => <<"deny">>, + <<"principal">> => <<"all">>, <<"topics">> => [<<"#">>]}), -% request_http_rest_lookup(Path) -> -% request_api(get, uri([Path]), default_auth_header()). + {ok, 200, Result4} = request(get, uri(["authorization", binary_to_list(Id)]), []), + ?assertMatch(#{<<"annotations">> := #{<<"id">> := Id}, + <<"permission">> := <<"deny">> + }, jsx:decode(Result4)), -% request_http_rest_add(Path, Params) -> -% request_api(post, uri(Path), [], default_auth_header(), Params). + lists:foreach(fun(#{<<"annotations">> := #{<<"id">> := Id}}) -> + {ok, 204, _} = request(delete, uri(["authorization", binary_to_list(Id)]), []) + end, Rules), + {ok, 200, Result5} = request(get, uri(["authorization"]), []), + ?assertEqual([], get_rules(Result5)), + ok. -% request_http_rest_update(Path, Params) -> -% request_api(put, uri([Path]), [], default_auth_header(), Params). +t_move_rule(_) -> + ok = emqx_authz:update(replace, [?RULE1, ?RULE2, ?RULE3, ?RULE4]), + [#{annotations := #{id := Id1}}, + #{annotations := #{id := Id2}}, + #{annotations := #{id := Id3}}, + #{annotations := #{id := Id4}} + ] = emqx_authz:lookup(), -% request_http_rest_delete(Login) -> -% request_api(delete, uri([Login]), default_auth_header()). + {ok, 204, _} = request(post, uri(["authorization", Id4, "move"]), + #{<<"position">> => <<"top">>}), + ?assertMatch([#{annotations := #{id := Id4}}, + #{annotations := #{id := Id1}}, + #{annotations := #{id := Id2}}, + #{annotations := #{id := Id3}} + ], emqx_authz:lookup()), -% uri() -> uri([]). -% uri(Parts) when is_list(Parts) -> -% NParts = [b2l(E) || E <- Parts], -% ?HOST ++ filename:join([?BASE_PATH, ?API_VERSION | NParts]). + {ok, 204, _} = request(post, uri(["authorization", Id1, "move"]), + #{<<"position">> => <<"bottom">>}), + ?assertMatch([#{annotations := #{id := Id4}}, + #{annotations := #{id := Id2}}, + #{annotations := #{id := Id3}}, + #{annotations := #{id := Id1}} + ], emqx_authz:lookup()), -% %% @private -% b2l(B) when is_binary(B) -> -% binary_to_list(B); -% b2l(L) when is_list(L) -> -% L. + {ok, 204, _} = request(post, uri(["authorization", Id3, "move"]), + #{<<"position">> => #{<<"before">> => Id4}}), + ?assertMatch([#{annotations := #{id := Id3}}, + #{annotations := #{id := Id4}}, + #{annotations := #{id := Id2}}, + #{annotations := #{id := Id1}} + ], emqx_authz:lookup()), + + {ok, 204, _} = request(post, uri(["authorization", Id2, "move"]), + #{<<"position">> => #{<<"after">> => Id1}}), + ?assertMatch([#{annotations := #{id := Id3}}, + #{annotations := #{id := Id4}}, + #{annotations := #{id := Id1}}, + #{annotations := #{id := Id2}} + ], emqx_authz:lookup()), + + ok. + +%%-------------------------------------------------------------------- +%% HTTP Request +%%-------------------------------------------------------------------- + +request(Method, Url, Body) -> + Request = case Body of + [] -> {Url, [auth_header("admin", "public")]}; + _ -> {Url, [auth_header("admin", "public")], "application/json", jsx:encode(Body)} + end, + ct:pal("Method: ~p, Request: ~p", [Method, Request]), + case httpc:request(Method, Request, [], [{body_format, binary}]) of + {error, socket_closed_remotely} -> + {error, socket_closed_remotely}; + {ok, {{"HTTP/1.1", Code, _}, _Headers, Return} } -> + {ok, Code, Return}; + {ok, {Reason, _, _}} -> + {error, Reason} + end. + +uri() -> uri([]). +uri(Parts) when is_list(Parts) -> + NParts = [E || E <- Parts], + ?HOST ++ filename:join([?BASE_PATH, ?API_VERSION | NParts]). + +get_rules(Result) -> + maps:get(<<"rules">>, jsx:decode(Result), []). diff --git a/apps/emqx_authz/test/emqx_authz_mongo_SUITE.erl b/apps/emqx_authz/test/emqx_authz_mongo_SUITE.erl index 9ba6aa843..67f9a3bfe 100644 --- a/apps/emqx_authz/test/emqx_authz_mongo_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_mongo_SUITE.erl @@ -31,6 +31,7 @@ groups() -> init_per_suite(Config) -> meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]), meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end), + meck:expect(emqx_resource, remove, fun(_) -> ok end ), ok = emqx_ct_helpers:start_apps([emqx_authz]), ok = emqx_config:update([zones, default, authorization, cache, enable], false), diff --git a/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl b/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl index 54aa7d8fc..a1120684e 100644 --- a/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl @@ -31,6 +31,7 @@ groups() -> init_per_suite(Config) -> meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]), meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end ), + meck:expect(emqx_resource, remove, fun(_) -> ok end ), ok = emqx_ct_helpers:start_apps([emqx_authz]), diff --git a/apps/emqx_authz/test/emqx_authz_pgsql_SUITE.erl b/apps/emqx_authz/test/emqx_authz_pgsql_SUITE.erl index 66a6581a8..61a719474 100644 --- a/apps/emqx_authz/test/emqx_authz_pgsql_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_pgsql_SUITE.erl @@ -31,6 +31,7 @@ groups() -> init_per_suite(Config) -> meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]), meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end ), + meck:expect(emqx_resource, remove, fun(_) -> ok end ), ok = emqx_ct_helpers:start_apps([emqx_authz]), diff --git a/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl b/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl index 0eb42bdb8..4a1765589 100644 --- a/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl @@ -31,6 +31,7 @@ groups() -> init_per_suite(Config) -> meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]), meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end ), + meck:expect(emqx_resource, remove, fun(_) -> ok end ), ok = emqx_ct_helpers:start_apps([emqx_authz]), diff --git a/apps/emqx_dashboard/src/emqx_dashboard_api.erl b/apps/emqx_dashboard/src/emqx_dashboard_api.erl index b518232af..a56df7ec3 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_api.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_api.erl @@ -97,6 +97,14 @@ users_api() -> responses => #{ <<"200">> => response_array_schema(<<"">>, show_user) } + }, + post => #{ + description => <<"Create dashboard users">>, + 'requestBody' => request_body_schema(create_user), + responses => #{ + <<"200">> => response_schema(<<"Create Users successfully">>), + <<"400">> => bad_request() + } } }, {"/users", Metadata, users}. @@ -117,7 +125,7 @@ user_api() -> 'requestBody' => request_body_schema(#{ type => object, properties => #{ - tags => #{ + tag => #{ type => string } } @@ -126,15 +134,6 @@ user_api() -> <<"200">> => response_schema(<<"Update Users successfully">>), <<"400">> => bad_request() } - }, - post => #{ - description => <<"Create dashboard users">>, - parameters => [path_param_username()], - 'requestBody' => request_body_schema(create_user), - responses => #{ - <<"200">> => response_schema(<<"Create Users successfully">>), - <<"400">> => bad_request() - } } }, {"/users/:username", Metadata, user}. @@ -161,7 +160,7 @@ change_pwd_api() -> } } }, - {"/change_pwd/:username", Metadata, change_pwd}. + {"/users/:username/change_pwd", Metadata, change_pwd}. path_param_username() -> #{ @@ -187,14 +186,32 @@ auth(post, Request) -> end. users(get, _Request) -> - {200, [row(User) || User <- emqx_dashboard_admin:all_users()]}. + {200, [row(User) || User <- emqx_dashboard_admin:all_users()]}; + +users(post, Request) -> + {ok, Body, _} = cowboy_req:read_body(Request), + Params = emqx_json:decode(Body, [return_maps]), + Tag = maps:get(<<"tag">>, Params), + Username = maps:get(<<"username">>, Params), + Password = maps:get(<<"password">>, Params), + case ?EMPTY(Username) orelse ?EMPTY(Password) of + true -> + {400, #{code => <<"CREATE_USER_FAIL">>, + message => <<"Username or password undefined">>}}; + false -> + case emqx_dashboard_admin:add_user(Username, Password, Tag) of + ok -> {200}; + {error, Reason} -> + {400, #{code => <<"CREATE_USER_FAIL">>, message => Reason}} + end + end. user(put, Request) -> Username = cowboy_req:binding(username, Request), {ok, Body, _} = cowboy_req:read_body(Request), Params = emqx_json:decode(Body, [return_maps]), - Tags = maps:get(<<"tags">>, Params), - case emqx_dashboard_admin:update_user(Username, Tags) of + Tag = maps:get(<<"tag">>, Params), + case emqx_dashboard_admin:update_user(Username, Tag) of ok -> {200}; {error, Reason} -> {400, #{code => <<"UPDATE_FAIL">>, message => Reason}} @@ -208,24 +225,6 @@ user(delete, Request) -> false -> _ = emqx_dashboard_admin:remove_user(Username), {200} - end; - -user(post, Request) -> - {ok, Body, _} = cowboy_req:read_body(Request), - Params = emqx_json:decode(Body, [return_maps]), - Tags = maps:get(<<"tags">>, Params), - Username = maps:get(<<"username">>, Params), - Password = maps:get(<<"password">>, Params), - case ?EMPTY(Username) orelse ?EMPTY(Password) of - true -> - {400, #{code => <<"CREATE_USER_FAIL">>, - message => <<"Username or password undefined">>}}; - false -> - case emqx_dashboard_admin:add_user(Username, Password, Tags) of - ok -> {200}; - {error, Reason} -> - {400, #{code => <<"CREATE_USER_FAIL">>, message => Reason}} - end end. change_pwd(put, Request) -> @@ -240,8 +239,8 @@ change_pwd(put, Request) -> {400, #{code => <<"CHANGE_PWD_FAIL">>, message => Reason}} end. -row(#mqtt_admin{username = Username, tags = Tags}) -> - #{username => Username, tags => Tags}. +row(#mqtt_admin{username = Username, tags = Tag}) -> + #{username => Username, tag => Tag}. bad_request() -> response_schema(<<"Bad Request">>, diff --git a/apps/emqx_dashboard/src/emqx_dashboard_collection.erl b/apps/emqx_dashboard/src/emqx_dashboard_collection.erl index bf172ee97..91d60e1ab 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_collection.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_collection.erl @@ -56,7 +56,7 @@ start_link() -> get_collect() -> gen_server:call(whereis(?MODULE), get_collect). init([]) -> - timer(timer:seconds(interval()), collect), + timer(next_interval(), collect), timer(get_today_remaining_seconds(), clear_expire_data), ExpireInterval = emqx_config:get([emqx_dashboard, monitor, interval], ?EXPIRE_INTERVAL), State = #{ @@ -68,6 +68,15 @@ init([]) -> }, {ok, State}. +%% @doc every whole interval seconds; +%% example: +%% interval is 10s +%% now 15:01:07 (or 15:07:01 ~ 15:07:10) +%% next will be 15:01:10, 15:01:20, 15:01:30 ... +%% ensure all counters in cluster have sync time +next_interval() -> + (1000 * interval()) - (erlang:system_time(millisecond) rem (1000 * interval())) - 1. + interval() -> emqx_config:get([?APP, sample_interval], ?DEFAULT_INTERVAL). @@ -82,17 +91,17 @@ handle_cast(_Req, State) -> {noreply, State}. handle_info(collect, State = #{collect := Collect, count := 1, temp_collect := TempCollect, last_collects := LastCollect}) -> + timer(next_interval(), collect), NewLastCollect = flush(collect_all(Collect), LastCollect), TempCollect1 = temp_collect(TempCollect), - timer(timer:seconds(interval()), collect), {noreply, State#{count => count(), collect => ?COLLECT, temp_collect => TempCollect1, last_collects => NewLastCollect}}; handle_info(collect, State = #{count := Count, collect := Collect, temp_collect := TempCollect}) -> + timer(next_interval(), collect), TempCollect1 = temp_collect(TempCollect), - timer(timer:seconds(interval()), collect), {noreply, State#{count => Count - 1, collect => collect_all(Collect), temp_collect => TempCollect1}, hibernate}; @@ -170,4 +179,4 @@ get_today_remaining_seconds() -> get_local_time() -> (calendar:datetime_to_gregorian_seconds(calendar:local_time()) - - calendar:datetime_to_gregorian_seconds({{1970,1,1}, {0,0,0}})) * 1000. + calendar:datetime_to_gregorian_seconds({{1970,1,1}, {0,0,0}})). diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl index 130139780..277b0b1fd 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl @@ -8,11 +8,6 @@ -behaviour(minirest_api). --export([ sampling/1 - , sampling/2 - , get_collect/1 - ]). - -export([api_spec/0]). -export([counters/2, current_counters/2]). @@ -36,8 +31,7 @@ monitor_api() -> name => node, in => query, required => false, - schema => #{type => string}, - example => node() + schema => #{type => string} }, #{ name => counter, @@ -47,7 +41,7 @@ monitor_api() -> } ], responses => #{ - <<"200">> => emqx_mgmt_util:response_array_schema(<<"Monitor count data">>, counters)}}}, + <<"200">> => emqx_mgmt_util:response_schema(<<"Monitor count data">>, counters)}}}, {"/monitor", Metadata, counters}. monitor_current_api() -> Metadata = #{ @@ -62,9 +56,6 @@ current_counters_schema() -> #{ type => object, properties => #{ - nodes => #{ - type => integer, - description => <<"Nodes count">>}, connection => #{type => integer}, sent => #{type => integer}, received => #{type => integer}, @@ -72,13 +63,11 @@ current_counters_schema() -> }. counters_schema() -> - Node = #{ - node => #{ - type => string, - example => node() - } - }, - Properties = lists:foldl(fun(K, M) -> maps:merge(M, counters_schema(K)) end, Node, ?COUNTERS), + Fun = + fun(K, M) -> + maps:merge(M, counters_schema(K)) + end, + Properties = lists:foldl(Fun, #{}, ?COUNTERS), #{ counters => #{ type => object, @@ -100,8 +89,7 @@ counters_schema(Name) -> counters(get, Request) -> case cowboy_req:parse_qs(Request) of [] -> - Response = [sampling(Node) || Node <- ekka_mnesia:running_nodes()], - {200, Response}; + {200, get_collect()}; Params -> lookup(Params) end. @@ -144,6 +132,10 @@ format_current_metrics([], Acc) -> format_current_metrics([{Received, Sent, Sub, Conn} | Collects], {Received1, Sent1, Sub1, Conn1}) -> format_current_metrics(Collects, {Received1 + Received, Sent1 + Sent, Sub1 + Sub, Conn1 + Conn}). +get_collect() -> + Counters = [sampling(Node) || Node <- ekka_mnesia:running_nodes()], + merger_counters(Counters). + get_collect(Node) when Node =:= node() -> emqx_dashboard_collection:get_collect(); get_collect(Node) -> @@ -152,17 +144,61 @@ get_collect(Node) -> Res -> Res end. +merger_counters(ClusterCounters) -> + lists:foldl(fun merger_node_counters/2, #{}, ClusterCounters). + +merger_node_counters(NodeCounters, Counters) -> + maps:fold(fun merger_counter/3, Counters, NodeCounters). + +merger_counter(Key, Counters, Res) -> + case maps:get(Key, Res, undefined) of + undefined -> + Res#{Key => Counters}; + OldCounters -> + NCounters = lists:foldl(fun merger_counter/2, OldCounters, Counters), + Res#{Key => NCounters} + end. + +merger_counter(#{timestamp := Timestamp, count := Value}, Counters) -> + Comparison = + fun(Counter) -> + case maps:get(timestamp, Counter) =:= Timestamp of + true -> + Count = maps:get(count, Counter), + {ok, Counter#{count => Count + Value}}; + false -> + ignore + end + end, + key_replace(Counters, Comparison, #{timestamp => Timestamp, count => Value}). + +key_replace(List, Comparison, Default) -> + key_replace(List, List, Comparison, Default). + +key_replace([], All, _Comparison, Default) -> + [Default | All]; + +key_replace([Term | List], All, Comparison, Default) -> + case Comparison(Term) of + {ok, NTerm} -> + Tail = [NTerm | List], + Header = lists:sublist(All, length(All) - length(Tail)), + lists:append(Header, Tail); + _ -> + key_replace(List, All, Comparison, Default) + end. + sampling(Node) when Node =:= node() -> Time = emqx_dashboard_collection:get_local_time() - 7200000, All = dets:select(emqx_collect, [{{mqtt_collect,'$1','$2'}, [{'>', '$1', Time}], ['$_']}]), - maps:put(node, Node, format(lists:sort(All))); + format(lists:sort(All)); sampling(Node) -> rpc:call(Node, ?MODULE, sampling, [Node]). sampling(Node, Counter) when Node =:= node() -> Time = emqx_dashboard_collection:get_local_time() - 7200000, All = dets:select(emqx_collect, [{{mqtt_collect,'$1','$2'}, [{'>', '$1', Time}], ['$_']}]), - maps:put(node, Node, format_single(lists:sort(All), Counter)); + format_single(lists:sort(All), Counter); sampling(Node, Counter) -> rpc:call(Node, ?MODULE, sampling, [Node, Counter]). diff --git a/apps/emqx_dashboard/test/emqx_dashboard_SUITE.erl b/apps/emqx_dashboard/test/emqx_dashboard_SUITE.erl index 1ffb6786e..da51a2418 100644 --- a/apps/emqx_dashboard/test/emqx_dashboard_SUITE.erl +++ b/apps/emqx_dashboard/test/emqx_dashboard_SUITE.erl @@ -96,7 +96,7 @@ t_rest_api(_Config) -> , http_post("users", #{<<"username">> => <<"usera">>, <<"password">> => <<"passwd">>}) , http_post("auth", #{<<"username">> => <<"usera">>, <<"password">> => <<"passwd">>}) , http_delete("users/usera") - , http_put("change_pwd/admin", #{<<"old_pwd">> => <<"public">>, <<"new_pwd">> => <<"newpwd">>}) + , http_put("users/admin/change_pwd", #{<<"old_pwd">> => <<"public">>, <<"new_pwd">> => <<"newpwd">>}) , http_post("auth", #{<<"username">> => <<"admin">>, <<"password">> => <<"newpwd">>}) ]], ok. diff --git a/apps/emqx_machine/etc/emqx_machine.conf b/apps/emqx_machine/etc/emqx_machine.conf index 0797a9d70..989665f97 100644 --- a/apps/emqx_machine/etc/emqx_machine.conf +++ b/apps/emqx_machine/etc/emqx_machine.conf @@ -426,16 +426,16 @@ log { ## Limits the total number of characters printed for each log event. ## ## @doc log.chars_limit - ## ValueType: Integer | infinity - ## Range: [0, infinity) - ## Default: infinity - chars_limit: infinity + ## ValueType: unlimited | Integer + ## Range: [0, +Inf) + ## Default: unlimited + chars_limit: unlimited ## Maximum depth for Erlang term log formatting ## and Erlang process message queue inspection. ## ## @doc log.max_depth - ## ValueType: Integer | infinity + ## ValueType: unlimited | Integer ## Default: 80 max_depth: 80 diff --git a/apps/emqx_machine/src/emqx_machine.erl b/apps/emqx_machine/src/emqx_machine.erl index 7fb51801a..76a51fc3b 100644 --- a/apps/emqx_machine/src/emqx_machine.erl +++ b/apps/emqx_machine/src/emqx_machine.erl @@ -138,7 +138,32 @@ start_one_app(App) -> %% 1. due to static static config change %% 2. after join a cluster reboot_apps() -> - [gproc, esockd, ranch, cowboy, ekka, emqx]. %% | ?EMQX_DEP_APPS]. + [ gproc + , esockd + , ranch + , cowboy + , ekka + , emqx + , emqx_prometheus + , emqx_modules + , emqx_dashboard + , emqx_connector + , emqx_gateway + , emqx_statsd + , emqx_resource + , emqx_rule_engine + , emqx_data_bridge + , emqx_bridge_mqtt + , emqx_plugin_libs + , emqx_config_helper + , emqx_management + , emqx_release_helper + , emqx_retainer + , emqx_exhook + , emqx_rule_actions + , emqx_authn + , emqx_authz + ]. sorted_reboot_apps() -> Apps = [{App, app_deps(App)} || App <- reboot_apps()], diff --git a/apps/emqx_machine/src/emqx_machine_schema.erl b/apps/emqx_machine/src/emqx_machine_schema.erl index 7e3843081..bf695bb19 100644 --- a/apps/emqx_machine/src/emqx_machine_schema.erl +++ b/apps/emqx_machine/src/emqx_machine_schema.erl @@ -161,9 +161,11 @@ fields("log") -> , {"console_handler", ref("console_handler")} , {"file_handlers", ref("file_handlers")} , {"time_offset", t(string(), undefined, "system")} - , {"chars_limit", maybe_infinity(range(1, inf))} + , {"chars_limit", #{type => hoconsc:union([unlimited, range(1, inf)]), + default => unlimited + }} , {"supervisor_reports", t(union([error, progress]), undefined, error)} - , {"max_depth", t(union([infinity, integer()]), + , {"max_depth", t(union([unlimited, integer()]), "kernel.error_logger_format_depth", 80)} , {"formatter", t(union([text, json]), undefined, text)} , {"single_line", t(boolean(), undefined, true)} @@ -188,7 +190,8 @@ fields("log_file_handler") -> [ {"level", t(log_level(), undefined, warning)} , {"file", t(file(), undefined, undefined)} , {"rotation", ref("log_rotation")} - , {"max_size", maybe_infinity(emqx_schema:bytesize(), "10MB")} + , {"max_size", #{type => union([infinity, emqx_schema:bytesize()]), + default => "10MB"}} ]; fields("log_rotation") -> @@ -258,8 +261,8 @@ tr_logger_level(Conf) -> conf_get("log.primary_level", Conf). tr_logger(Conf) -> CharsLimit = case conf_get("log.chars_limit", Conf) of - infinity -> unlimited; - V -> V + unlimited -> unlimited; + V when V > 0 -> V end, SingleLine = conf_get("log.single_line", Conf), FmtName = conf_get("log.formatter", Conf), @@ -378,15 +381,6 @@ t(Type, Mapping, Default, OverrideEnv) -> ref(Field) -> hoconsc:t(hoconsc:ref(Field)). -maybe_infinity(T) -> - maybe_sth(infinity, T, infinity). - -maybe_infinity(T, Default) -> - maybe_sth(infinity, T, Default). - -maybe_sth(What, Type, Default) -> - t(union([What, Type]), undefined, Default). - options(static, Conf) -> [{seeds, [to_atom(S) || S <- conf_get("cluster.static.seeds", Conf, [])]}]; options(mcast, Conf) -> diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index f7b2eb644..7fb700c55 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -283,7 +283,7 @@ list_client_subscriptions(ClientId) -> end. client_subscriptions(Node, ClientId) when Node =:= node() -> - emqx_broker:subscriptions(ClientId); + {Node, emqx_broker:subscriptions(ClientId)}; client_subscriptions(Node, ClientId) -> rpc_call(Node, client_subscriptions, [Node, ClientId]). diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 3c0379d2d..1bc01e6d6 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -220,16 +220,32 @@ clients_api() -> get => #{ description => <<"List clients">>, parameters => [ + #{ + name => page, + in => query, + required => false, + description => <<"Page">>, + schema => #{type => integer} + }, + #{ + name => limit, + in => query, + required => false, + description => <<"Page limit">>, + schema => #{type => integer} + }, #{ name => node, in => query, required => false, + description => <<"Node name">>, schema => #{type => string} }, #{ name => username, in => query, required => false, + description => <<"User name">>, schema => #{type => string} }, #{ @@ -242,66 +258,77 @@ clients_api() -> name => ip_address, in => query, required => false, + description => <<"IP address">>, schema => #{type => string} }, #{ name => conn_state, in => query, required => false, - schema => #{type => string} + description => <<"The current connection status of the client, the possible values are connected,idle,disconnected">>, + schema => #{type => string, enum => [connected, idle, disconnected]} }, #{ name => clean_start, in => query, required => false, - schema => #{type => string} + description => <<"Whether the client uses a new session">>, + schema => #{type => boolean} }, #{ name => proto_name, in => query, required => false, - schema => #{type => string} + description => <<"Client protocol name, the possible values are MQTT,CoAP,LwM2M,MQTT-SN">>, + schema => #{type => string, enum => ['MQTT', 'CoAP', 'LwM2M', 'MQTT-SN']} }, #{ name => proto_ver, in => query, required => false, + description => <<"Client protocol version">>, schema => #{type => string} }, #{ name => like_clientid, in => query, required => false, + description => <<"Fuzzy search of client identifier by substring method">>, schema => #{type => string} }, #{ name => like_username, in => query, required => false, + description => <<"Client user name, fuzzy search by substring">>, schema => #{type => string} }, #{ name => gte_created_at, in => query, required => false, + description => <<"Search client session creation time by less than or equal method">>, schema => #{type => string} }, #{ name => lte_created_at, in => query, required => false, + description => <<"Search client session creation time by greater than or equal method">>, schema => #{type => string} }, #{ name => gte_connected_at, in => query, required => false, + description => <<"Search client connection creation time by less than or equal method">>, schema => #{type => string} }, #{ name => lte_connected_at, in => query, required => false, + description => <<"Search client connection creation time by greater than or equal method">>, schema => #{type => string} } ], @@ -362,15 +389,6 @@ clients_authz_cache_api() -> {"/clients/:clientid/authz_cache", Metadata, authz_cache}. clients_subscriptions_api() -> - SubscriptionSchema = #{ - type => object, - properties => #{ - topic => #{ - type => string}, - qos => #{ - type => integer, - enum => [0,1,2]}} - }, Metadata = #{ get => #{ description => <<"Get client subscriptions">>, @@ -382,7 +400,7 @@ clients_subscriptions_api() -> }], responses => #{ <<"200">> => - emqx_mgmt_util:response_array_schema(<<"Get client subscriptions">>, SubscriptionSchema)}} + emqx_mgmt_util:response_array_schema(<<"Get client subscriptions">>, subscription)}} }, {"/clients/:clientid/subscriptions", Metadata, subscriptions}. @@ -486,9 +504,9 @@ subscribe_batch(post, Request) -> subscriptions(get, Request) -> ClientID = cowboy_req:binding(clientid, Request), - Subs0 = emqx_mgmt:list_client_subscriptions(ClientID), + {Node, Subs0} = emqx_mgmt:list_client_subscriptions(ClientID), Subs = lists:map(fun({Topic, SubOpts}) -> - #{topic => Topic, qos => maps:get(qos, SubOpts)} + #{node => Node, clientid => ClientID, topic => Topic, qos => maps:get(qos, SubOpts)} end, Subs0), {200, Subs}. diff --git a/apps/emqx_management/src/emqx_mgmt_api_configs.erl b/apps/emqx_management/src/emqx_mgmt_api_configs.erl new file mode 100644 index 000000000..15bc8af11 --- /dev/null +++ b/apps/emqx_management/src/emqx_mgmt_api_configs.erl @@ -0,0 +1,189 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_mgmt_api_configs). + +-behaviour(minirest_api). + +-export([api_spec/0]). + +-export([ config/2 + , config_reset/2 + ]). + +-define(PARAM_CONF_PATH, [#{ + name => conf_path, + in => query, + description => <<"The config path separated by '.' character">>, + required => false, + schema => #{type => string, default => <<".">>} +}]). + +-define(TEXT_BODY(DESCR, SCHEMA), #{ + description => list_to_binary(DESCR), + content => #{ + <<"text/plain">> => #{ + schema => SCHEMA + } + } +}). + +-define(PREFIX, "/configs"). +-define(PREFIX_RESET, "/configs_reset"). + +-define(MAX_DEPTH, 1). + +-define(ERR_MSG(MSG), io_lib:format("~p", [MSG])). + +api_spec() -> + {config_apis() ++ [config_reset_api()], []}. + +config_apis() -> + [config_api(ConfPath, Schema) || {ConfPath, Schema} <- + get_conf_schema(emqx_config:get([]), ?MAX_DEPTH)]. + +config_api(ConfPath, Schema) -> + Path = path_join(ConfPath), + Descr = fun(Str) -> + list_to_binary([Str, " ", path_join(ConfPath, ".")]) + end, + Metadata = #{ + get => #{ + description => Descr("Get configs for"), + responses => #{ + <<"200">> => ?TEXT_BODY("Get configs successfully", Schema), + <<"404">> => emqx_mgmt_util:response_error_schema( + <<"Config not found">>, ['NOT_FOUND']) + } + }, + put => #{ + description => Descr("Update configs for"), + 'requestBody' => ?TEXT_BODY("The format of the request body is depend on the 'conf_path' parameter in the query string", Schema), + responses => #{ + <<"200">> => ?TEXT_BODY("Update configs successfully", Schema), + <<"400">> => emqx_mgmt_util:response_error_schema( + <<"Update configs failed">>, ['UPDATE_FAILED']) + } + } + }, + {?PREFIX ++ "/" ++ Path, Metadata, config}. + +config_reset_api() -> + Metadata = #{ + post => #{ + description => <<"Reset the config entry specified by the query string parameter `conf_path`.
+- For a config entry that has default value, this resets it to the default value; +- For a config entry that has no default value, an error 400 will be returned">>, + parameters => ?PARAM_CONF_PATH, + responses => #{ + <<"200">> => emqx_mgmt_util:response_schema(<<"Remove configs successfully">>), + <<"400">> => emqx_mgmt_util:response_error_schema( + <<"It's not able to reset the config">>, ['INVALID_OPERATION']) + } + } + }, + {?PREFIX_RESET, Metadata, config_reset}. + +%%%============================================================================================== +%% parameters trans +config(get, Req) -> + case emqx_config:find_raw(conf_path(Req)) of + {ok, Conf} -> + {200, Conf}; + {not_found, _, _} -> + {404, #{code => 'NOT_FOUND', message => <<"Config cannot found">>}} + end; + +config(put, Req) -> + Path = conf_path(Req), + ok = emqx_config:update(Path, http_body(Req)), + {200, emqx_config:get_raw(Path)}. + +config_reset(post, Req) -> + %% reset the config specified by the query string param 'conf_path' + Path = conf_path_reset(Req), + case emqx_config:remove(Path ++ conf_path_from_querystr(Req)) of + ok -> {200, emqx_config:get_raw(Path)}; + {error, Reason} -> + {400, ?ERR_MSG(Reason)} + end. + +conf_path_from_querystr(Req) -> + case proplists:get_value(<<"conf_path">>, cowboy_req:parse_qs(Req)) of + undefined -> []; + Path -> string:lexemes(Path, ". ") + end. + +conf_path(Req) -> + <<"/api/v5", ?PREFIX, Path/binary>> = cowboy_req:path(Req), + string:lexemes(Path, "/ "). + +conf_path_reset(Req) -> + <<"/api/v5", ?PREFIX_RESET, Path/binary>> = cowboy_req:path(Req), + string:lexemes(Path, "/ "). + +http_body(Req) -> + {ok, Body, _} = cowboy_req:read_body(Req), + try jsx:decode(Body, [{return_maps, true}]) + catch error:badarg -> Body + end. + +get_conf_schema(Conf, MaxDepth) -> + get_conf_schema([], maps:to_list(Conf), [], MaxDepth). + +get_conf_schema(_BasePath, [], Result, _MaxDepth) -> + Result; +get_conf_schema(BasePath, [{Key, Conf} | Confs], Result, MaxDepth) -> + Path = BasePath ++ [Key], + Depth = length(Path), + Result1 = case is_map(Conf) of + true when Depth < MaxDepth -> + get_conf_schema(Path, maps:to_list(Conf), Result, MaxDepth); + true when Depth >= MaxDepth -> Result; + false -> Result + end, + get_conf_schema(BasePath, Confs, [{Path, gen_schema(Conf)} | Result1], MaxDepth). + +%% TODO: generate from hocon schema +gen_schema(Conf) when is_boolean(Conf) -> + #{type => boolean}; +gen_schema(Conf) when is_binary(Conf); is_atom(Conf) -> + #{type => string}; +gen_schema(Conf) when is_number(Conf) -> + #{type => number}; +gen_schema(Conf) when is_list(Conf) -> + #{type => array, items => case Conf of + [] -> #{}; %% don't know the type + _ -> gen_schema(hd(Conf)) + end}; +gen_schema(Conf) when is_map(Conf) -> + #{type => object, properties => + maps:map(fun(_K, V) -> gen_schema(V) end, Conf)}; +gen_schema(_Conf) -> + %% the conf is not of JSON supported type, it may have been converted + %% by the hocon schema + #{type => string}. + +path_join(Path) -> + path_join(Path, "/"). + +path_join([P], _Sp) -> str(P); +path_join([P | Path], Sp) -> + str(P) ++ Sp ++ path_join(Path, Sp). + +str(S) when is_list(S) -> S; +str(S) when is_binary(S) -> binary_to_list(S); +str(S) when is_atom(S) -> atom_to_list(S). diff --git a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl index 059327f4c..27e8c898a 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl @@ -67,11 +67,17 @@ subscriptions_api() -> description => <<"Client ID">>, schema => #{type => string} }, + #{ + name => node, + in => query, + description => <<"Node name">>, + schema => #{type => string} + }, #{ name => qos, in => query, description => <<"QoS">>, - schema => #{type => integer} + schema => #{type => integer, enum => [0, 1, 2]} }, #{ name => share, @@ -101,6 +107,8 @@ subscription_schema() -> subscription => #{ type => object, properties => #{ + node => #{ + type => string}, topic => #{ type => string}, clientid => #{ @@ -115,8 +123,12 @@ subscriptions(get, Request) -> list(Params). list(Params) -> - {200, emqx_mgmt_api:cluster_query(Params, ?SUBS_QS_SCHEMA, ?query_fun)}. - + case proplists:get_value(<<"node">>, Params, undefined) of + undefined -> + {200, emqx_mgmt_api:cluster_query(Params, ?SUBS_QS_SCHEMA, ?query_fun)}; + Node -> + {200, emqx_mgmt_api:node_query(binary_to_atom(Node, utf8), Params, ?SUBS_QS_SCHEMA, ?query_fun)} + end. format(Items) when is_list(Items) -> [format(Item) || Item <- Items]; @@ -126,10 +138,20 @@ format({{Subscriber, Topic}, Options}) -> format({_Subscriber, Topic, Options = #{share := Group}}) -> QoS = maps:get(qos, Options), - #{topic => filename:join([<<"$share">>, Group, Topic]), clientid => maps:get(subid, Options), qos => QoS}; + #{ + topic => filename:join([<<"$share">>, Group, Topic]), + clientid => maps:get(subid, Options), + qos => QoS, + node => node() + }; format({_Subscriber, Topic, Options}) -> QoS = maps:get(qos, Options), - #{topic => Topic, clientid => maps:get(subid, Options), qos => QoS}. + #{ + topic => Topic, + clientid => maps:get(subid, Options), + qos => QoS, + node => node() + }. %%-------------------------------------------------------------------- %% Query Function diff --git a/apps/emqx_management/src/emqx_mgmt_http.erl b/apps/emqx_management/src/emqx_mgmt_http.erl index 8c7b0d7c5..c795e1de7 100644 --- a/apps/emqx_management/src/emqx_mgmt_http.erl +++ b/apps/emqx_management/src/emqx_mgmt_http.erl @@ -48,6 +48,14 @@ start_listener({Proto, Port, Options}) -> openapi => "3.0.0", info => #{title => "EMQ X API", version => "5.0.0"}, servers => [#{url => ?BASE_PATH}], + tags => [#{ + name => configs, + description => <<"The query string parameter `conf_path` is of jq format.">>, + externalDocs => #{ + description => "Find out more about the path syntax in jq", + url => "https://stedolan.github.io/jq/manual/" + } + }], components => #{ schemas => #{}, securitySchemes => #{ diff --git a/apps/emqx_retainer/etc/emqx_retainer.conf b/apps/emqx_retainer/etc/emqx_retainer.conf index 3a96909ff..4a4308b2e 100644 --- a/apps/emqx_retainer/etc/emqx_retainer.conf +++ b/apps/emqx_retainer/etc/emqx_retainer.conf @@ -61,21 +61,18 @@ emqx_retainer: { ## Storage connect parameters ## - ## Value: mnesia + ## Value: built_in_database ## - connector: - [ - { - type: mnesia - config: { - ## storage_type: ram | disc | disc_only - storage_type: ram + config: { - ## Maximum number of retained messages. 0 means no limit. - ## - ## Value: Number >= 0 - max_retained_messages: 0 - } - } - ] + type: built_in_database + + ## storage_type: ram | disc | disc_only + storage_type: ram + + ## Maximum number of retained messages. 0 means no limit. + ## + ## Value: Number >= 0 + max_retained_messages: 0 + } } diff --git a/apps/emqx_retainer/rebar.config b/apps/emqx_retainer/rebar.config index 7e762cb72..b49f979ac 100644 --- a/apps/emqx_retainer/rebar.config +++ b/apps/emqx_retainer/rebar.config @@ -19,6 +19,6 @@ [{test, [{deps, [ - {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.3"}}}]} + {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.4.0"}}}]} ]} ]}. diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 86cf98af6..acd119dee 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -338,16 +338,16 @@ get_msg_deliver_quota() -> update_config(#{clear_timer := ClearTimer, release_quota_timer := QuotaTimer} = State, Conf) -> #{enable := Enable, - connector := [Connector | _], + config := Config, flow_control := #{quota_release_interval := QuotaInterval}, msg_clear_interval := ClearInterval} = Conf, - #{connector := [OldConnector | _]} = emqx_config:get([?APP]), + #{config := OldConfig} = emqx_config:get([?APP]), case Enable of true -> - StorageType = maps:get(type, Connector), - OldStrorageType = maps:get(type, OldConnector), + StorageType = maps:get(type, Config), + OldStrorageType = maps:get(type, OldConfig), case OldStrorageType of StorageType -> State#{clear_timer := check_timer(ClearTimer, @@ -368,9 +368,9 @@ update_config(#{clear_timer := ClearTimer, enable_retainer(#{context_id := ContextId} = State, #{msg_clear_interval := ClearInterval, flow_control := #{quota_release_interval := ReleaseInterval}, - connector := [Connector | _]}) -> + config := Config}) -> NewContextId = ContextId + 1, - Context = create_resource(new_context(NewContextId), Connector), + Context = create_resource(new_context(NewContextId), Config), load(Context), State#{enable := true, context_id := NewContextId, @@ -416,14 +416,19 @@ check_timer(Timer, _, _) -> -spec get_backend_module() -> backend(). get_backend_module() -> - [#{type := Backend} | _] = emqx_config:get([?APP, connector]), - erlang:list_to_existing_atom(io_lib:format("~s_~s", [?APP, Backend])). + #{type := Backend} = emqx_config:get([?APP, config]), + ModName = if Backend =:= built_in_database -> + mnesia; + true -> + Backend + end, + erlang:list_to_existing_atom(io_lib:format("~s_~s", [?APP, ModName])). -create_resource(Context, #{type := mnesia, config := Cfg}) -> +create_resource(Context, #{type := built_in_database} = Cfg) -> emqx_retainer_mnesia:create_resource(Cfg), Context; -create_resource(Context, #{type := DB, config := Config}) -> +create_resource(Context, #{type := DB} = Config) -> ResourceID = erlang:iolist_to_binary([io_lib:format("~s_~s", [?APP, DB])]), case emqx_resource:create( ResourceID, diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index dcfeffa8c..34e3e49db 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -33,8 +33,6 @@ -export([create_resource/1]). --define(DEF_MAX_RETAINED_MESSAGES, 0). - -rlog_shard({?RETAINER_SHARD, ?TAB}). -record(retained, {topic, msg, expiry_time}). @@ -229,10 +227,7 @@ make_match_spec(Filter) -> -spec is_table_full() -> boolean(). is_table_full() -> - [#{config := Cfg} | _] = emqx_config:get([?APP, connector]), - Limit = maps:get(max_retained_messages, - Cfg, - ?DEF_MAX_RETAINED_MESSAGES), + #{max_retained_messages := Limit} = emqx_config:get([?APP, config]), Limit > 0 andalso (table_size() >= Limit). -spec table_size() -> non_neg_integer(). diff --git a/apps/emqx_retainer/src/emqx_retainer_schema.erl b/apps/emqx_retainer/src/emqx_retainer_schema.erl index 96cf80846..df31f647f 100644 --- a/apps/emqx_retainer/src/emqx_retainer_schema.erl +++ b/apps/emqx_retainer/src/emqx_retainer_schema.erl @@ -12,18 +12,14 @@ fields("emqx_retainer") -> [ {enable, t(boolean(), false)} , {msg_expiry_interval, t(emqx_schema:duration_ms(), "0s")} , {msg_clear_interval, t(emqx_schema:duration_ms(), "0s")} - , {connector, connector()} , {flow_control, ?TYPE(hoconsc:ref(?MODULE, flow_control))} , {max_payload_size, t(emqx_schema:bytesize(), "1MB")} + , {config, config()} ]; -fields(mnesia_connector) -> - [ {type, ?TYPE(hoconsc:union([mnesia]))} - , {config, ?TYPE(hoconsc:ref(?MODULE, mnesia_connector_cfg))} - ]; - -fields(mnesia_connector_cfg) -> - [ {storage_type, t(hoconsc:union([ram, disc, disc_only]), ram)} +fields(mnesia_config) -> + [ {type, ?TYPE(hoconsc:union([built_in_database]))} + , {storage_type, t(hoconsc:union([ram, disc, disc_only]), ram)} , {max_retained_messages, t(integer(), 0, fun is_pos_integer/1)} ]; @@ -43,11 +39,8 @@ t(Type, Default, Validator) -> hoconsc:t(Type, #{default => Default, validator => Validator}). -union_array(Item) when is_list(Item) -> - hoconsc:array(hoconsc:union(Item)). - is_pos_integer(V) -> V >= 0. -connector() -> - #{type => union_array([hoconsc:ref(?MODULE, mnesia_connector)])}. +config() -> + #{type => hoconsc:union([hoconsc:ref(?MODULE, mnesia_config)])}. diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index de2481580..a2efd0357 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -67,14 +67,13 @@ new_emqx_retainer_conf() -> #{enable => true, msg_expiry_interval => 0, msg_clear_interval => 0, - connector => [#{type => mnesia, - config => - #{max_retained_messages => 0, - storage_type => ram}}], + config => #{type => built_in_database, + max_retained_messages => 0, + storage_type => ram}, flow_control => #{max_read_number => 0, msg_deliver_quota => 0, quota_release_interval => 0}, - max_payload_size => 1024 * 1024}. + max_payload_size => 1024 * 1024}. %%-------------------------------------------------------------------- %% Test Cases diff --git a/bin/emqx b/bin/emqx index 6b6a6504c..a286d2801 100755 --- a/bin/emqx +++ b/bin/emqx @@ -302,9 +302,19 @@ bootstrapd() { # check if a PID is down is_down() { PID="$1" - if kill -s 0 "$PID" 2>/dev/null; then + if ps -p "$PID" >/dev/null; then + # still around + # shellcheck disable=SC2009 # this grep pattern is not a part of the progra names + if ps -p "$PID" | grep -q 'defunct'; then + # zombie state, print parent pid + parent="$(ps -o ppid= -p "$PID" | tr -d ' ')" + echo "WARN: $PID is marked , parent:" + ps -p "$parent" + return 0 + fi return 1 fi + # it's gone return 0 } @@ -484,12 +494,12 @@ case "$1" in exit 1 fi WAIT_TIME="${WAIT_FOR_ERLANG_STOP:-60}" - if ! wait_for "$WAIT_TIME" is_down "$PID"; then + if ! wait_for "$WAIT_TIME" 'is_down' "$PID"; then msg="dangling after ${WAIT_TIME} seconds" # also log to syslog logger -t "${REL_NAME}[${PID}]" "STOP: $msg" # log to user console - echoerr "STOP: $msg" + echoerr "stop failed, $msg" exit 1 fi logger -t "${REL_NAME}[${PID}]" "STOP: OK" diff --git a/rebar.config b/rebar.config index 8d1d7c3d7..b59909dbe 100644 --- a/rebar.config +++ b/rebar.config @@ -10,7 +10,8 @@ {edoc_opts, [{preprocess,true}]}. {erl_opts, [warn_unused_vars,warn_shadow_vars,warn_unused_import, warn_obsolete_guard,compressed, nowarn_unused_import, - {d, snk_kind, msg}]}. + {d, snk_kind, msg} + ]}. {xref_checks,[undefined_function_calls,undefined_functions,locals_not_used, deprecated_function_calls,warnings_as_errors,deprecated_functions]}. @@ -59,7 +60,7 @@ , {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}} , {observer_cli, "1.6.1"} % NOTE: depends on recon 2.5.1 , {getopt, "1.0.2"} - , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.13.0"}}} + , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.14.1"}}} , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.11.0"}}} , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.4.0"}}} , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.1.0"}}} diff --git a/rebar.config.erl b/rebar.config.erl index f97aecb55..cfc5ce226 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -52,13 +52,17 @@ overrides() -> [ {add, [ {extra_src_dirs, [{"etc", [{recursive,true}]}]} , {erl_opts, [{compile_info, [{emqx_vsn, get_vsn()}]}]} ]} - , {add, snabbkaffe, - [{erl_opts, common_compile_opts()}]} - ] ++ community_plugin_overrides(). + ] ++ snabbkaffe_overrides() ++ community_plugin_overrides(). community_plugin_overrides() -> [{add, App, [ {erl_opts, [{i, "include"}]}]} || App <- relx_plugin_apps_extra()]. +%% Temporary workaround for a rebar3 erl_opts duplication +%% bug. Ideally, we want to set this define globally +snabbkaffe_overrides() -> + Apps = [snabbkaffe, ekka], + [{add, App, [{erl_opts, [{d, snk_kind, msg}]}]} || App <- Apps]. + config(HasElixir) -> [ {cover_enabled, is_cover_enabled()} , {profiles, profiles()} @@ -129,12 +133,9 @@ test_deps() -> ]. common_compile_opts() -> - AppNames = app_names(), [ debug_info % alwyas include debug_info , {compile_info, [{emqx_vsn, get_vsn()}]} - , {d, snk_kind, msg} ] ++ - [{d, 'EMQX_DEP_APPS', AppNames -- [emqx, emqx_machine]}] ++ [{d, 'EMQX_ENTERPRISE'} || is_enterprise()] ++ [{d, 'EMQX_BENCHMARK'} || os:getenv("EMQX_BENCHMARK") =:= "1" ].