diff --git a/.ci/docker-compose-file/conf.env b/.ci/docker-compose-file/conf.env index 93dfecd2b..0141a6b4b 100644 --- a/.ci/docker-compose-file/conf.env +++ b/.ci/docker-compose-file/conf.env @@ -1,5 +1,5 @@ EMQX_AUTH__LDAP__SERVERS=ldap_server -EMQX_AUTH__MONGO__SERVER=mongo_server:27017 +EMQX_AUTH__MONGO__SERVER=toxiproxy:27017 EMQX_AUTH__MYSQL__SERVER=mysql_server:3306 EMQX_AUTH__MYSQL__USERNAME=root EMQX_AUTH__MYSQL__PASSWORD=public diff --git a/.ci/docker-compose-file/docker-compose-toxiproxy.yaml b/.ci/docker-compose-file/docker-compose-toxiproxy.yaml new file mode 100644 index 000000000..005ac40d0 --- /dev/null +++ b/.ci/docker-compose-file/docker-compose-toxiproxy.yaml @@ -0,0 +1,16 @@ +version: '3.9' + +services: + toxiproxy: + container_name: toxiproxy + image: ghcr.io/shopify/toxiproxy:2.5.0 + restart: always + networks: + - emqx_bridge + volumes: + - "./toxiproxy.json:/config/toxiproxy.json" + ports: + - 8474:8474 + command: + - "-host=0.0.0.0" + - "-config=/config/toxiproxy.json" diff --git a/.ci/docker-compose-file/toxiproxy.json b/.ci/docker-compose-file/toxiproxy.json new file mode 100644 index 000000000..7079b0599 --- /dev/null +++ b/.ci/docker-compose-file/toxiproxy.json @@ -0,0 +1,8 @@ +[ + { + "name": "mongo", + "listen": "0.0.0.0:27017", + "upstream": "mongo:27017", + "enabled": true + } +] diff --git a/.github/actions/detect-profiles/action.yaml b/.github/actions/detect-profiles/action.yaml new file mode 100644 index 000000000..555674c28 --- /dev/null +++ b/.github/actions/detect-profiles/action.yaml @@ -0,0 +1,26 @@ +name: 'Detect profiles' +inputs: + ci_git_token: + required: true + type: string +outputs: + profiles: + description: 'Detected profiles' + value: ${{ steps.detect-profiles.outputs.profiles}} + +runs: + using: composite + steps: + - id: detect-profiles + shell: bash + run: | + git config --global --add safe.directory "$GITHUB_WORKSPACE" + if make emqx-ee --dry-run > /dev/null 2>&1; then + echo "::set-output name=profiles::[\"emqx-ee\"]" + echo "https://ci%40emqx.io:${{ inputs.ci_git_token }}@github.com" > $HOME/.git-credentials + git config --global credential.helper store + echo "EMQX_NAME=emqx-ee" >> $GITHUB_ENV + else + echo "::set-output name=profiles::[\"emqx\", \"emqx-edge\"]" + echo "EMQX_NAME=emqx" >> $GITHUB_ENV + fi diff --git a/.github/actions/package-macos/action.yaml b/.github/actions/package-macos/action.yaml new file mode 100644 index 000000000..5c0649b17 --- /dev/null +++ b/.github/actions/package-macos/action.yaml @@ -0,0 +1,95 @@ +name: 'Create MacOS package' +inputs: + profile: # emqx, emqx-enterprise + required: true + type: string + otp: # 24.2.1-1, 23.3.4.9-3 + required: true + type: string + os: + required: false + type: string + default: macos-11 + apple_id_password: + required: true + type: string + apple_developer_identity: + required: true + type: string + apple_developer_id_bundle: + required: true + type: string + apple_developer_id_bundle_password: + required: true + type: string + +runs: + using: composite + steps: + - name: prepare + shell: bash + run: | + brew update + brew install curl zip unzip gnu-sed kerl coreutils unixodbc freetds openssl@1.1 + echo "/usr/local/opt/bison/bin" >> $GITHUB_PATH + echo "/usr/local/bin" >> $GITHUB_PATH + - uses: actions/cache@v2 + id: cache + with: + path: ~/.kerl/${{ inputs.otp }} + key: otp-install-${{ inputs.otp }}-${{ inputs.os }}-static-ssl-disable-hipe-disable-jit + - name: build erlang + if: steps.cache.outputs.cache-hit != 'true' + shell: bash + env: + KERL_BUILD_BACKEND: git + OTP_GITHUB_URL: https://github.com/emqx/otp + KERL_CONFIGURE_OPTIONS: --disable-dynamic-ssl-lib --with-ssl=/usr/local/opt/openssl@1.1 --disable-hipe --disable-jit + run: | + kerl update releases + kerl build ${{ inputs.otp }} + kerl install ${{ inputs.otp }} $HOME/.kerl/${{ inputs.otp }} + - name: build ${{ inputs.profile }} + env: + AUTO_INSTALL_BUILD_DEPS: 1 + APPLE_SIGN_BINARIES: 1 + APPLE_ID: developers@emqx.io + APPLE_TEAM_ID: 26N6HYJLZA + APPLE_ID_PASSWORD: ${{ inputs.apple_id_password }} + APPLE_DEVELOPER_IDENTITY: ${{ inputs.apple_developer_identity }} + APPLE_DEVELOPER_ID_BUNDLE: ${{ inputs.apple_developer_id_bundle }} + APPLE_DEVELOPER_ID_BUNDLE_PASSWORD: ${{ inputs.apple_developer_id_bundle_password }} + shell: bash + run: | + . $HOME/.kerl/${{ inputs.otp }}/activate + make ensure-rebar3 + sudo cp rebar3 /usr/local/bin/rebar3 + make ${{ inputs.profile }}-zip + - name: test ${{ inputs.profile }} + shell: bash + run: | + pkg_name=$(basename _packages/${{ inputs.profile }}/${{ inputs.profile }}-*.zip) + unzip -q _packages/${{ inputs.profile }}/$pkg_name + gsed -i '/emqx_telemetry/d' ./emqx/data/loaded_plugins + ./emqx/bin/emqx start || cat emqx/log/erlang.log.1 + ready='no' + for i in {1..10}; do + if curl -fs 127.0.0.1:18083 > /dev/null; then + ready='yes' + break + fi + sleep 1 + done + if [ "$ready" != "yes" ]; then + echo "Timed out waiting for emqx to be ready" + cat emqx/log/erlang.log.1 + exit 1 + fi + ./emqx/bin/emqx_ctl status + if ! ./emqx/bin/emqx stop; then + cat emqx/log/erlang.log.1 || true + cat emqx/log/emqx.log.1 || true + echo "failed to stop emqx" + exit 1 + fi + rm -rf emqx diff --git a/.github/workflows/build_packages.yaml b/.github/workflows/build_packages.yaml index 497233cdc..78b1bf14d 100644 --- a/.github/workflows/build_packages.yaml +++ b/.github/workflows/build_packages.yaml @@ -23,24 +23,18 @@ jobs: # prepare source with any OTP version, no need for a matrix container: ghcr.io/emqx/emqx-builder/4.4-19:24.1.5-3-ubuntu20.04 outputs: - profiles: ${{ steps.set_profile.outputs.profiles}} + profiles: ${{ steps.detect-profiles.outputs.profiles}} steps: - uses: actions/checkout@v2 with: path: source fetch-depth: 0 - - name: set profile - id: set_profile - shell: bash + - id: detect-profiles working-directory: source - run: | - git config --global --add safe.directory "$GITHUB_WORKSPACE" - if make emqx-ee --dry-run > /dev/null 2>&1; then - echo "::set-output name=profiles::[\"emqx-ee\"]" - else - echo "::set-output name=profiles::[\"emqx\", \"emqx-edge\"]" - fi + uses: ./.github/actions/detect-profiles + with: + ci_git_token: ${{ secrets.CI_GIT_TOKEN }} - name: get_all_deps if: endsWith(github.repository, 'emqx') run: | @@ -117,89 +111,35 @@ jobs: profile: ${{fromJSON(needs.prepare.outputs.profiles)}} otp: - 24.1.5-3 - macos: - - macos-11 exclude: - profile: emqx-edge - runs-on: ${{ matrix.macos }} + os: + - macos-11 + runs-on: ${{ matrix.os }} + steps: - uses: actions/download-artifact@v2 with: name: source path: . - name: unzip source code - run: unzip -q source.zip - - name: prepare run: | - brew update - brew install curl zip unzip gnu-sed kerl unixodbc freetds - echo "/usr/local/bin" >> $GITHUB_PATH - git config --global credential.helper store - - uses: actions/cache@v2 - id: cache + ln -s . source + unzip -q source.zip + rm source source.zip + - uses: ./.github/actions/package-macos with: - path: ~/.kerl/${{ matrix.otp }} - key: otp-install-${{ matrix.otp }}-${{ matrix.macos }}-static-ssl-disable-hipe-disable-jit - - name: build erlang - if: steps.cache.outputs.cache-hit != 'true' - timeout-minutes: 60 - env: - KERL_BUILD_BACKEND: git - OTP_GITHUB_URL: https://github.com/emqx/otp - KERL_CONFIGURE_OPTIONS: --disable-dynamic-ssl-lib --with-ssl=/usr/local/opt/openssl@1.1 --disable-hipe --disable-jit - run: | - kerl update releases - kerl build ${{ matrix.otp }} - kerl install ${{ matrix.otp }} $HOME/.kerl/${{ matrix.otp }} - - name: build - env: - APPLE_SIGN_BINARIES: 1 - APPLE_ID: developers@emqx.io - APPLE_TEAM_ID: 26N6HYJLZA - APPLE_ID_PASSWORD: ${{ secrets.APPLE_ID_PASSWORD }} - APPLE_DEVELOPER_IDENTITY: ${{ secrets.APPLE_DEVELOPER_IDENTITY }} - APPLE_DEVELOPER_ID_BUNDLE: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE }} - APPLE_DEVELOPER_ID_BUNDLE_PASSWORD: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE_PASSWORD }} - working-directory: source - run: | - . $HOME/.kerl/${{ matrix.otp }}/activate - make ensure-rebar3 - sudo cp rebar3 /usr/local/bin/rebar3 - rm -rf _build/${{ matrix.profile }}/lib - make ${{ matrix.profile }}-zip - - name: test - working-directory: source - run: | - set -x - pkg_name=$(find _packages/${{ matrix.profile }} -mindepth 1 -maxdepth 1 -iname \*.zip) - unzip -q $pkg_name - gsed -i '/emqx_telemetry/d' ./emqx/data/loaded_plugins - ./emqx/bin/emqx start || cat emqx/log/erlang.log.1 - ready='no' - for i in {1..10}; do - if curl -fs 127.0.0.1:18083 > /dev/null; then - ready='yes' - break - fi - sleep 1 - done - if [ "$ready" != "yes" ]; then - echo "Timed out waiting for emqx to be ready" - cat emqx/log/erlang.log.1 - exit 1 - fi - ./emqx/bin/emqx_ctl status - if ! ./emqx/bin/emqx stop; then - cat emqx/log/erlang.log.1 || true - cat emqx/log/emqx.log.1 || true - echo "failed to stop emqx" - exit 1 - fi - rm -rf emqx + profile: ${{ matrix.profile }} + otp: ${{ matrix.otp }} + os: ${{ matrix.os }} + apple_id_password: ${{ secrets.APPLE_ID_PASSWORD }} + apple_developer_identity: ${{ secrets.APPLE_DEVELOPER_IDENTITY }} + apple_developer_id_bundle: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE }} + apple_developer_id_bundle_password: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE_PASSWORD }} - uses: actions/upload-artifact@v1 with: name: ${{ matrix.profile }}-${{ matrix.otp }} - path: source/_packages/${{ matrix.profile }}/. + path: _packages/${{ matrix.profile }}/. linux: runs-on: ubuntu-20.04 diff --git a/.github/workflows/build_slim_packages.yaml b/.github/workflows/build_slim_packages.yaml index d18aa18f3..8f7e172ae 100644 --- a/.github/workflows/build_slim_packages.yaml +++ b/.github/workflows/build_slim_packages.yaml @@ -16,7 +16,7 @@ jobs: strategy: fail-fast: false matrix: - erl_otp: + otp: - 24.1.5-3 os: - ubuntu20.04 @@ -32,27 +32,20 @@ jobs: - runs-on: aws-amd64 use-self-hosted: false - container: ghcr.io/emqx/emqx-builder/4.4-19:${{ matrix.erl_otp }}-${{ matrix.os }} + container: ghcr.io/emqx/emqx-builder/4.4-19:${{ matrix.otp }}-${{ matrix.os }} steps: - uses: actions/checkout@v1 - - name: prepare - run: | - git config --global --add safe.directory "$GITHUB_WORKSPACE" - if make emqx-ee --dry-run > /dev/null 2>&1; then - echo "https://ci%40emqx.io:${{ secrets.CI_GIT_TOKEN }}@github.com" > $HOME/.git-credentials - git config --global credential.helper store - echo "EMQX_NAME=emqx-ee" >> $GITHUB_ENV - else - echo "EMQX_NAME=emqx" >> $GITHUB_ENV - fi + - uses: ./.github/actions/detect-profiles + with: + ci_git_token: ${{ secrets.CI_GIT_TOKEN }} - name: fix-git-unsafe-repository run: git config --global --add safe.directory /__w/emqx/emqx - uses: actions/cache@v2 with: # dialyzer PLTs path: ~/.cache/rebar3/ - key: dialyer-${{ matrix.erl_otp }} + key: dialyer-${{ matrix.otp }} - name: make xref run: make xref - name: make dialyzer @@ -117,65 +110,27 @@ jobs: strategy: fail-fast: false matrix: + profile: + - emqx otp: - - 24.1.5-3 - macos: - - macos-11 - - runs-on: ${{ matrix.macos }} - + - 24.1.5-3 + os: + - macos-11 + runs-on: ${{ matrix.os }} steps: - uses: actions/checkout@v1 - - name: prepare - run: | - if make emqx-ee --dry-run > /dev/null 2>&1; then - echo "https://ci%40emqx.io:${{ secrets.CI_GIT_TOKEN }}@github.com" > $HOME/.git-credentials - git config --global credential.helper store - echo "EMQX_NAME=emqx-ee" >> $GITHUB_ENV - else - echo "EMQX_NAME=emqx" >> $GITHUB_ENV - fi - - name: prepare - run: | - brew update - brew install curl zip unzip gnu-sed kerl unixodbc freetds - echo "/usr/local/bin" >> $GITHUB_PATH - git config --global credential.helper store - - uses: actions/cache@v2 - id: cache + - uses: ./.github/actions/detect-profiles with: - path: ~/.kerl/${{ matrix.otp }} - key: otp-install-${{ matrix.otp }}-${{ matrix.macos }}-static-ssl-disable-hipe-disable-jit - - name: build erlang - if: steps.cache.outputs.cache-hit != 'true' - timeout-minutes: 60 - env: - KERL_BUILD_BACKEND: git - OTP_GITHUB_URL: https://github.com/emqx/otp - KERL_CONFIGURE_OPTIONS: --disable-dynamic-ssl-lib --with-ssl=/usr/local/opt/openssl@1.1 --disable-hipe --disable-jit - run: | - kerl update releases - kerl build ${{ matrix.otp }} - kerl install ${{ matrix.otp }} $HOME/.kerl/${{ matrix.otp }} - - name: build - env: - APPLE_SIGN_BINARIES: 1 - APPLE_ID: developers@emqx.io - APPLE_TEAM_ID: 26N6HYJLZA - APPLE_ID_PASSWORD: ${{ secrets.APPLE_ID_PASSWORD }} - APPLE_DEVELOPER_IDENTITY: ${{ secrets.APPLE_DEVELOPER_IDENTITY }} - APPLE_DEVELOPER_ID_BUNDLE: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE }} - APPLE_DEVELOPER_ID_BUNDLE_PASSWORD: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE_PASSWORD }} - run: | - . $HOME/.kerl/${{ matrix.otp }}/activate - make ensure-rebar3 - sudo cp rebar3 /usr/local/bin/rebar3 - make ${EMQX_NAME}-zip - - uses: actions/upload-artifact@v1 - if: failure() + ci_git_token: ${{ secrets.CI_GIT_TOKEN }} + - uses: ./.github/actions/package-macos with: - name: rebar3.crashdump - path: ./rebar3.crashdump + profile: ${{ matrix.profile }} + otp: ${{ matrix.otp }} + os: ${{ matrix.os }} + apple_id_password: ${{ secrets.APPLE_ID_PASSWORD }} + apple_developer_identity: ${{ secrets.APPLE_DEVELOPER_IDENTITY }} + apple_developer_id_bundle: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE }} + apple_developer_id_bundle_password: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE_PASSWORD }} - name: test run: | pkg_name=$(find _packages/${EMQX_NAME} -mindepth 1 -maxdepth 1 -iname \*.zip) diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 560ce0ed1..c6c5b03bf 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -10,24 +10,18 @@ jobs: container: ghcr.io/emqx/emqx-builder/4.4-19:24.1.5-3-ubuntu20.04 outputs: - profiles: ${{ steps.set_profile.outputs.profiles}} - s3dir: ${{ steps.set_profile.outputs.s3dir}} + profiles: ${{ steps.detect-profiles.outputs.profiles}} steps: - uses: actions/checkout@v2 with: path: source fetch-depth: 0 - - name: set profile - id: set_profile - shell: bash - run: | - cd source - if make emqx-ee --dry-run > /dev/null 2>&1; then - echo "::set-output name=profiles::[\"emqx-ee\"]" - else - echo "::set-output name=profiles::[\"emqx\", \"emqx-edge\"]" - fi + - id: detect-profiles + working-directory: source + uses: ./.github/actions/detect-profiles + with: + ci_git_token: ${{ secrets.CI_GIT_TOKEN }} upload: runs-on: ubuntu-20.04 diff --git a/.github/workflows/run_cts_tests.yaml b/.github/workflows/run_cts_tests.yaml index 6b05a014e..cfcb7baca 100644 --- a/.github/workflows/run_cts_tests.yaml +++ b/.github/workflows/run_cts_tests.yaml @@ -82,6 +82,7 @@ jobs: - name: docker-compose up run: | docker-compose \ + -f .ci/docker-compose-file/docker-compose-toxiproxy.yaml \ -f .ci/docker-compose-file/docker-compose-mongo-${{ matrix.connect_type }}.yaml \ -f .ci/docker-compose-file/docker-compose.yaml \ up -d --build @@ -107,11 +108,11 @@ jobs: - name: setup if: matrix.network_type == 'ipv4' run: | - echo "EMQX_AUTH__MONGO__SERVER=$(docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' mongo):27017" >> "$GITHUB_ENV" + echo "EMQX_AUTH__MONGO__SERVER=$(docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' toxiproxy):27017" >> "$GITHUB_ENV" - name: setup if: matrix.network_type == 'ipv6' run: | - echo "EMQX_AUTH__MONGO__SERVER=$(docker inspect -f '{{range .NetworkSettings.Networks}}{{.GlobalIPv6Address}}{{end}}' mongo):27017" >> "$GITHUB_ENV" + echo "EMQX_AUTH__MONGO__SERVER=$(docker inspect -f '{{range .NetworkSettings.Networks}}{{.GlobalIPv6Address}}{{end}}' toxiproxy):27017" >> "$GITHUB_ENV" - name: set git token run: | if make emqx-ee --dry-run > /dev/null 2>&1; then diff --git a/.github/workflows/run_test_cases.yaml b/.github/workflows/run_test_cases.yaml index 72fe741e2..6f42730ea 100644 --- a/.github/workflows/run_test_cases.yaml +++ b/.github/workflows/run_test_cases.yaml @@ -6,7 +6,7 @@ on: - v* - e* branches: - - 'main-v4.?' + - 'main-v4.[0-9]?' pull_request: jobs: @@ -54,6 +54,7 @@ jobs: run: | docker-compose \ -f .ci/docker-compose-file/docker-compose.yaml \ + -f .ci/docker-compose-file/docker-compose-toxiproxy.yaml \ -f .ci/docker-compose-file/docker-compose-ldap-tcp.yaml \ -f .ci/docker-compose-file/docker-compose-mongo-tcp.yaml \ -f .ci/docker-compose-file/docker-compose-mysql-tcp.yaml \ @@ -81,6 +82,7 @@ jobs: run: | docker-compose \ -f .ci/docker-compose-file/docker-compose.yaml \ + -f .ci/docker-compose-file/docker-compose-toxiproxy.yaml \ -f .ci/docker-compose-file/docker-compose-ldap-tcp.yaml \ -f .ci/docker-compose-file/docker-compose-mongo-tcp.yaml \ -f .ci/docker-compose-file/docker-compose-mysql-tcp.yaml \ diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index dcde285b8..54a87dd2a 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -9,22 +9,35 @@ File format: - Use weight-2 heading for releases - One list item per change topic Change log ends with a list of GitHub PRs + +## v4.3.22 + ## v4.3.21 ### Enhancements -- TLS listener memory usage optimization - new option 'hibernate_after' to hibernate TLS process after idling -- TLS listener default buffer size to 4KB - Eliminate uncertainty that the buffer size is set by OS default - +- TLS listener memory usage optimization [#9005](https://github.com/emqx/emqx/pull/9005). + New config `listener.ssl.$NAME.hibernate_after` to hibernate TLS connection process after idling. + Hibernation can reduce RAM usage significantly, but may cost more CPU. + This configuration is by default disabled. + Our preliminary test shows a 50% of RAM usage decline when configured to '5s'. + +- TLS listener default buffer size to 4KB [#9007](https://github.com/emqx/emqx/pull/9007) + Eliminate uncertainty that the buffer size is set by OS default. + +- Fix delayed publish inaccurate caused by os time change. [#8908](https://github.com/emqx/emqx/pull/8908) + +- Disable authorization for `api/v4/emqx_prometheus` endpoint. [8955](https://github.com/emqx/emqx/pull/8955) + +- Added a test to prevent a last will testament message to be + published when a client is denied connection. [#8894](https://github.com/emqx/emqx/pull/8894) + ## v4.3.20 ### Bug fixes - Fix rule-engine update behaviour which may initialize actions for disabled rules. [#8849](https://github.com/emqx/emqx/pull/8849) - Fix JWT plugin don't support non-integer timestamp claims. [#8862](https://github.com/emqx/emqx/pull/8862) -- Fix delayed publish inaccurate caused by os time change. [#8908](https://github.com/emqx/emqx/pull/8908) - Fix a possible dead loop caused by shared subscriptions with `shared_dispatch_ack_enabled=true`. [#8918](https://github.com/emqx/emqx/pull/8918) - Fix dashboard binding IP address not working. [#8916](https://github.com/emqx/emqx/pull/8916) - Fix rule SQL topic matching to null values failed. [#8927](https://github.com/emqx/emqx/pull/8927) @@ -32,9 +45,6 @@ File format: `SELECT topic =~ 't' as r FROM "$events/client_connected"`. The topic is a null value as there's no such field in event `$events/client_connected`, so it should return false if match it to a topic. -- Added a test to prevent a last will testament message to be - published when a client is denied connection. [#8894](https://github.com/emqx/emqx/pull/8894) -- Disable authorization for `api/v4/emqx_prometheus` endpoint. [8955](https://github.com/emqx/emqx/pull/8955) ## v4.3.19 diff --git a/apps/emqx_auth_mnesia/test/emqx_auth_mnesia_SUITE.erl b/apps/emqx_auth_mnesia/test/emqx_auth_mnesia_SUITE.erl index f7071bc17..8529fb143 100644 --- a/apps/emqx_auth_mnesia/test/emqx_auth_mnesia_SUITE.erl +++ b/apps/emqx_auth_mnesia/test/emqx_auth_mnesia_SUITE.erl @@ -408,7 +408,7 @@ t_password_hash(_) -> ok = application:start(emqx_auth_mnesia). t_will_message_connection_denied(Config) when is_list(Config) -> - ClientId = Username = <<"subscriber">>, + ClientId = <<"subscriber">>, Password = <<"p">>, application:stop(emqx_auth_mnesia), ok = emqx_ct_helpers:start_apps([emqx_auth_mnesia]), diff --git a/apps/emqx_auth_mongo/src/emqx_acl_mongo.erl b/apps/emqx_auth_mongo/src/emqx_acl_mongo.erl index 491fad9ad..19e600454 100644 --- a/apps/emqx_auth_mongo/src/emqx_acl_mongo.erl +++ b/apps/emqx_auth_mongo/src/emqx_acl_mongo.erl @@ -79,4 +79,3 @@ feedvar(Str, Var, Val) -> re:replace(Str, Var, Val, [global, {return, binary}]). description() -> "ACL with MongoDB". - diff --git a/apps/emqx_auth_mongo/src/emqx_auth_mongo.app.src b/apps/emqx_auth_mongo/src/emqx_auth_mongo.app.src index 13e83387c..2849fe0a8 100644 --- a/apps/emqx_auth_mongo/src/emqx_auth_mongo.app.src +++ b/apps/emqx_auth_mongo/src/emqx_auth_mongo.app.src @@ -1,6 +1,6 @@ {application, emqx_auth_mongo, [{description, "EMQ X Authentication/ACL with MongoDB"}, - {vsn, "4.4.4"}, % strict semver, bump manually! + {vsn, "4.4.5"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_auth_mongo_sup]}, {applications, [kernel,stdlib,mongodb,ecpool]}, diff --git a/apps/emqx_auth_mongo/src/emqx_auth_mongo.appup.src b/apps/emqx_auth_mongo/src/emqx_auth_mongo.appup.src index 7a23082aa..eb62a4a57 100644 --- a/apps/emqx_auth_mongo/src/emqx_auth_mongo.appup.src +++ b/apps/emqx_auth_mongo/src/emqx_auth_mongo.appup.src @@ -1,7 +1,8 @@ %% -*- mode: erlang -*- %% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{<<"4\\.4\\.[2-3]">>, + [{"4.4.4",[{load_module,emqx_auth_mongo,brutal_purge,soft_purge,[]}]}, + {<<"4\\.4\\.[2-3]">>, [{load_module,emqx_auth_mongo_app,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_mongo,brutal_purge,soft_purge,[]}]}, {"4.4.1", @@ -14,7 +15,8 @@ {load_module,emqx_auth_mongo,brutal_purge,soft_purge,[]}, {load_module,emqx_acl_mongo,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [{<<"4\\.4\\.[2-3]">>, + [{"4.4.4",[{load_module,emqx_auth_mongo,brutal_purge,soft_purge,[]}]}, + {<<"4\\.4\\.[2-3]">>, [{load_module,emqx_auth_mongo_app,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_mongo,brutal_purge,soft_purge,[]}]}, {"4.4.1", diff --git a/apps/emqx_auth_mongo/src/emqx_auth_mongo.erl b/apps/emqx_auth_mongo/src/emqx_auth_mongo.erl index bfb911707..b3259ab52 100644 --- a/apps/emqx_auth_mongo/src/emqx_auth_mongo.erl +++ b/apps/emqx_auth_mongo/src/emqx_auth_mongo.erl @@ -22,6 +22,7 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/types.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -export([ check/3 , description/0 @@ -38,14 +39,22 @@ , available/3 ]). +-ifdef(TEST). +-export([ is_superuser/3 + , available/4 + ]). +-endif. + check(ClientInfo = #{password := Password}, AuthResult, Env = #{authquery := AuthQuery, superquery := SuperQuery}) -> + ?tp(emqx_auth_mongo_superuser_check_authn_enter, #{}), #authquery{collection = Collection, field = Fields, hash = HashType, selector = Selector} = AuthQuery, Pool = maps:get(pool, Env, ?APP), case query(Pool, Collection, maps:from_list(replvars(Selector, ClientInfo))) of undefined -> ok; {error, Reason} -> + ?tp(emqx_auth_mongo_check_authn_error, #{error => Reason}), ?LOG(error, "[MongoDB] Can't connect to MongoDB server: ~0p", [Reason]), {stop, AuthResult#{auth_result => not_authorized, anonymous => false}}; UserMap -> @@ -58,6 +67,7 @@ check(ClientInfo = #{password := Password}, AuthResult, end, case Result of ok -> + ?tp(emqx_auth_mongo_superuser_check_authn_ok, #{}), {stop, AuthResult#{is_superuser => is_superuser(Pool, SuperQuery, ClientInfo), anonymous => false, auth_result => success}}; @@ -81,17 +91,24 @@ description() -> "Authentication with MongoDB". is_superuser(_Pool, undefined, _ClientInfo) -> false; is_superuser(Pool, #superquery{collection = Coll, field = Field, selector = Selector}, ClientInfo) -> - case query(Pool, Coll, maps:from_list(replvars(Selector, ClientInfo))) of - undefined -> false; - {error, Reason} -> - ?LOG(error, "[MongoDB] Can't connect to MongoDB server: ~0p", [Reason]), - false; - Row -> - case maps:get(Field, Row, false) of - true -> true; - _False -> false - end - end. + ?tp(emqx_auth_mongo_superuser_query_enter, #{}), + Res = + case query(Pool, Coll, maps:from_list(replvars(Selector, ClientInfo))) of + undefined -> + %% returned when there are no returned documents + false; + {error, Reason} -> + ?tp(emqx_auth_mongo_superuser_query_error, #{error => Reason}), + ?LOG(error, "[MongoDB] Can't connect to MongoDB server: ~0p", [Reason]), + false; + Row -> + case maps:get(Field, Row, false) of + true -> true; + _False -> false + end + end, + ?tp(emqx_auth_mongo_superuser_query_result, #{is_superuser => Res}), + Res. %%-------------------------------------------------------------------- %% Availability Test @@ -114,6 +131,7 @@ available(Pool, Collection, Query) -> available(Pool, Collection, Query, Fun) -> try Fun(Pool, Collection, Query) of {error, Reason} -> + ?tp(emqx_auth_mongo_available_error, #{error => Reason}), ?LOG(error, "[MongoDB] ~p availability test error: ~0p", [Collection, Reason]), {error, Reason}; Error = #{<<"code">> := Code} -> @@ -144,7 +162,16 @@ test_client_info() -> %%-------------------------------------------------------------------- replvars(VarList, ClientInfo) -> - lists:map(fun(Var) -> replvar(Var, ClientInfo) end, VarList). + lists:foldl( + fun(Var, Selector) -> + case replvar(Var, ClientInfo) of + %% assumes that all fields are binaries... + {unmatchable, Field} -> [{Field, []} | Selector]; + Interpolated -> [Interpolated | Selector] + end + end, + [], + VarList). replvar({Field, <<"%u">>}, #{username := Username}) -> {Field, Username}; @@ -154,8 +181,8 @@ replvar({Field, <<"%C">>}, #{cn := CN}) -> {Field, CN}; replvar({Field, <<"%d">>}, #{dn := DN}) -> {Field, DN}; -replvar(Selector, _ClientInfo) -> - Selector. +replvar({Field, _PlaceHolder}, _ClientInfo) -> + {unmatchable, Field}. %%-------------------------------------------------------------------- %% MongoDB Connect/Query @@ -169,19 +196,57 @@ connect(Opts) -> mongo_api:connect(Type, Hosts, Options, WorkerOptions). query(Pool, Collection, Selector) -> - ecpool:with_client(Pool, fun(Conn) -> mongo_api:find_one(Conn, Collection, Selector, #{}) end). + Timeout = timer:seconds(15), + with_timeout(Timeout, fun() -> + ecpool:with_client(Pool, fun(Conn) -> mongo_api:find_one(Conn, Collection, Selector, #{}) end) + end). query_multi(Pool, Collection, SelectorList) -> + ?tp(emqx_auth_mongo_query_multi_enter, #{}), + Timeout = timer:seconds(45), lists:reverse(lists:flatten(lists:foldl(fun(Selector, Acc1) -> - Batch = ecpool:with_client(Pool, fun(Conn) -> - case mongo_api:find(Conn, Collection, Selector, #{}) of - {error, Reason} -> - ?LOG(error, "[MongoDB] query_multi failed, got error: ~p", [Reason]), - []; - [] -> []; - {ok, Cursor} -> - mc_cursor:foldl(fun(O, Acc2) -> [O|Acc2] end, [], Cursor, 1000) - end - end), - [Batch|Acc1] + Res = + with_timeout(Timeout, fun() -> + ecpool:with_client(Pool, fun(Conn) -> + ?tp(emqx_auth_mongo_query_multi_find_selector, #{}), + case find(Conn, Collection, Selector) of + {error, Reason} -> + ?tp(emqx_auth_mongo_query_multi_error, + #{error => Reason}), + ?LOG(error, "[MongoDB] query_multi failed, got error: ~p", [Reason]), + []; + [] -> + ?tp(emqx_auth_mongo_query_multi_no_results, #{}), + []; + {ok, Cursor} -> + mc_cursor:foldl(fun(O, Acc2) -> [O | Acc2] end, [], Cursor, 1000) + end + end) + end), + case Res of + {error, timeout} -> + ?tp(emqx_auth_mongo_query_multi_error, #{error => timeout}), + ?LOG(error, "[MongoDB] query_multi timeout", []), + Acc1; + Batch -> + [Batch | Acc1] + end end, [], SelectorList))). + +find(Conn, Collection, Selector) -> + try + mongo_api:find(Conn, Collection, Selector, #{}) + catch + K:E:S -> + {error, {K, E, S}} + end. + +with_timeout(Timeout, Fun) -> + try + emqx_misc:nolink_apply(Fun, Timeout) + catch + exit:timeout -> + {error, timeout}; + K:E:S -> + erlang:raise(K, E, S) + end. diff --git a/apps/emqx_auth_mongo/src/emqx_auth_mongo_app.erl b/apps/emqx_auth_mongo/src/emqx_auth_mongo_app.erl index a63aa8193..ed8b68a68 100644 --- a/apps/emqx_auth_mongo/src/emqx_auth_mongo_app.erl +++ b/apps/emqx_auth_mongo/src/emqx_auth_mongo_app.erl @@ -30,6 +30,10 @@ , stop/1 ]). +-ifdef(TEST). +-export([with_env/2]). +-endif. + %%-------------------------------------------------------------------- %% Application callbacks %%-------------------------------------------------------------------- diff --git a/apps/emqx_auth_mongo/test/emqx_auth_mongo_SUITE.erl b/apps/emqx_auth_mongo/test/emqx_auth_mongo_SUITE.erl index 66f5253d0..1765b3821 100644 --- a/apps/emqx_auth_mongo/test/emqx_auth_mongo_SUITE.erl +++ b/apps/emqx_auth_mongo/test/emqx_auth_mongo_SUITE.erl @@ -19,42 +19,98 @@ -compile(export_all). -compile(nowarn_export_all). +-include("emqx_auth_mongo.hrl"). -include_lib("emqx/include/emqx.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). - --define(APP, emqx_auth_mongo). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(POOL(App), ecpool_worker:client(gproc_pool:pick_worker({ecpool, App}))). -define(MONGO_CL_ACL, <<"mqtt_acl">>). -define(MONGO_CL_USER, <<"mqtt_user">>). --define(INIT_ACL, [{<<"username">>, <<"testuser">>, <<"clientid">>, <<"null">>, <<"subscribe">>, [<<"#">>]}, - {<<"username">>, <<"dashboard">>, <<"clientid">>, <<"null">>, <<"pubsub">>, [<<"$SYS/#">>]}, - {<<"username">>, <<"user3">>, <<"clientid">>, <<"null">>, <<"publish">>, [<<"a/b/c">>]}]). +-define(INIT_ACL, [ { <<"username">>, <<"testuser">> + , <<"clientid">>, <<"null">> + , <<"subscribe">>, [<<"#">>] + } + , { <<"username">>, <<"dashboard">> + , <<"clientid">>, <<"null">> + , <<"pubsub">>, [<<"$SYS/#">>] + } + , { <<"username">>, <<"user3">> + , <<"clientid">>, <<"null">> + , <<"publish">>, [<<"a/b/c">>] + } + ]). --define(INIT_AUTH, [{<<"username">>, <<"plain">>, <<"password">>, <<"plain">>, <<"salt">>, <<"salt">>, <<"is_superuser">>, true}, - {<<"username">>, <<"md5">>, <<"password">>, <<"1bc29b36f623ba82aaf6724fd3b16718">>, <<"salt">>, <<"salt">>, <<"is_superuser">>, false}, - {<<"username">>, <<"sha">>, <<"password">>, <<"d8f4590320e1343a915b6394170650a8f35d6926">>, <<"salt">>, <<"salt">>, <<"is_superuser">>, false}, - {<<"username">>, <<"sha256">>, <<"password">>, <<"5d5b09f6dcb2d53a5fffc60c4ac0d55fabdf556069d6631545f42aa6e3500f2e">>, <<"salt">>, <<"salt">>, <<"is_superuser">>, false}, - {<<"username">>, <<"pbkdf2_password">>, <<"password">>, <<"cdedb5281bb2f801565a1122b2563515">>, <<"salt">>, <<"ATHENA.MIT.EDUraeburn">>, <<"is_superuser">>, false}, - {<<"username">>, <<"bcrypt_foo">>, <<"password">>, <<"$2a$12$sSS8Eg.ovVzaHzi1nUHYK.HbUIOdlQI0iS22Q5rd5z.JVVYH6sfm6">>, <<"salt">>, <<"$2a$12$sSS8Eg.ovVzaHzi1nUHYK.">>, <<"is_superuser">>, false} - ]). +-define(INIT_AUTH, [ { <<"username">>, <<"plain">> + , <<"password">>, <<"plain">> + , <<"salt">>, <<"salt">> + , <<"is_superuser">>, true + } + , { <<"username">>, <<"md5">> + , <<"password">>, <<"1bc29b36f623ba82aaf6724fd3b16718">> + , <<"salt">>, <<"salt">> + , <<"is_superuser">>, false + } + , { <<"username">>, <<"sha">> + , <<"password">>, <<"d8f4590320e1343a915b6394170650a8f35d6926">> + , <<"salt">>, <<"salt">> + , <<"is_superuser">>, false + } + , { <<"username">>, <<"sha256">> + , <<"password">>, <<"5d5b09f6dcb2d53a5fffc60c4ac0d55fabdf556069d6631545f42aa6e3500f2e">> + , <<"salt">>, <<"salt">> + , <<"is_superuser">>, false + } + , { <<"username">>, <<"pbkdf2_password">> + , <<"password">>, <<"cdedb5281bb2f801565a1122b2563515">> + , <<"salt">>, <<"ATHENA.MIT.EDUraeburn">> + , <<"is_superuser">>, false + } + , { <<"username">>, <<"bcrypt_foo">> + , <<"password">>, <<"$2a$12$sSS8Eg.ovVzaHzi1nUHYK.HbUIOdlQI0iS22Q5rd5z.JVVYH6sfm6">> + , <<"salt">>, <<"$2a$12$sSS8Eg.ovVzaHzi1nUHYK.">> + , <<"is_superuser">>, false + } + , { <<"username">>, <<"user_full">> + , <<"clientid">>, <<"client_full">> + , <<"common_name">>, <<"cn_full">> + , <<"distinguished_name">>, <<"dn_full">> + , <<"password">>, <<"plain">> + , <<"salt">>, <<"salt">> + , <<"is_superuser">>, false + } + ]). %%-------------------------------------------------------------------- %% Setups %%-------------------------------------------------------------------- all() -> - emqx_ct:all(?MODULE). + OtherTCs = emqx_ct:all(?MODULE) -- resilience_tests(), + [ {group, resilience} + | OtherTCs]. -init_per_suite(Cfg) -> +resilience_tests() -> + [ t_acl_superuser_timeout + , t_available_acl_query_no_connection + , t_available_acl_query_timeout + , t_available_authn_query_timeout + , t_authn_timeout + , t_available + ]. + +groups() -> + [ {resilience, resilience_tests()} + ]. + +init_per_suite(Config) -> emqx_ct_helpers:start_apps([emqx_auth_mongo], fun set_special_confs/1), - init_mongo_data(), %% avoid inter-suite flakiness ok = emqx_mod_acl_internal:unload([]), - Cfg. + Config. end_per_suite(_Cfg) -> deinit_mongo_data(), @@ -69,6 +125,81 @@ set_special_confs(emqx) -> set_special_confs(_App) -> ok. +init_per_group(resilience, Config) -> + ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), + ProxyPortStr = os:getenv("PROXY_PORT", "8474"), + ProxyPort = list_to_integer(ProxyPortStr), + reset_proxy(ProxyHost, ProxyPort), + ProxyServer = ProxyHost ++ ":27017", + {ok, OriginalServer} = application:get_env(emqx_auth_mongo, server), + OriginalServerMap = maps:from_list(OriginalServer), + NewServerMap = OriginalServerMap#{hosts => [ProxyServer]}, + NewServer = maps:to_list(NewServerMap), + emqx_ct_helpers:stop_apps([emqx_auth_mongo]), + Handler = + fun(App = emqx_auth_mongo) -> + application:set_env(emqx_auth_mongo, server, NewServer), + set_special_confs(App); + (App)-> + set_special_confs(App) + end, + emqx_ct_helpers:start_apps([emqx_auth_mongo], Handler), + [ {original_server, OriginalServer} + , {proxy_host, ProxyHost} + , {proxy_port, ProxyPort} + | Config]; +init_per_group(_Group, Config) -> + Config. + +end_per_group(resilience, Config) -> + OriginalServer = ?config(original_server, Config), + application:set_env(emqx_auth_mongo, server, OriginalServer), + emqx_ct_helpers:stop_apps([emqx_auth_mongo]), + emqx_ct_helpers:start_apps([emqx_auth_mongo], fun set_special_confs/1), + ok; +end_per_group(_Group, _Config) -> + ok. + +init_per_testcase(t_authn_full_selector_variables, Config) -> + {ok, AuthQuery} = application:get_env(emqx_auth_mongo, auth_query), + OriginalSelector = proplists:get_value(selector, AuthQuery), + Selector = [ {<<"username">>, <<"%u">>} + , {<<"clientid">>, <<"%c">>} + , {<<"common_name">>, <<"%C">>} + , {<<"distinguished_name">>, <<"%d">>} + ], + reload({auth_query, [{selector, Selector}]}), + init_mongo_data(), + [ {original_selector, OriginalSelector} + , {selector, Selector} + | Config]; +init_per_testcase(_TestCase, Config) -> + init_mongo_data(), + Config. + +end_per_testcase(t_authn_full_selector_variables, Config) -> + OriginalSelector = ?config(original_selector, Config), + reload({auth_query, [{selector, OriginalSelector}]}), + deinit_mongo_data(), + ok; +end_per_testcase(TestCase, Config) + when TestCase =:= t_available_acl_query_timeout; + TestCase =:= t_acl_superuser_timeout; + TestCase =:= t_authn_no_connection; + TestCase =:= t_available_acl_query_no_connection -> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + reset_proxy(ProxyHost, ProxyPort), + %% force restart of clients because CI tends to get stuck... + application:stop(emqx_auth_mongo), + application:start(emqx_auth_mongo), + wait_for_stabilization(#{attempts => 10, interval_ms => 500}), + deinit_mongo_data(), + ok; +end_per_testcase(_TestCase, _Config) -> + deinit_mongo_data(), + ok. + init_mongo_data() -> %% Users {ok, Connection} = ?POOL(?APP), @@ -87,6 +218,14 @@ deinit_mongo_data() -> %% Test cases %%-------------------------------------------------------------------- +%% for full coverage ;-) +t_authn_description(_Config) -> + ?assert(is_list(emqx_auth_mongo:description())). + +%% for full coverage ;-) +t_acl_description(_Config) -> + ?assert(is_list(emqx_acl_mongo:description())). + t_check_auth(_) -> Plain = #{zone => external, clientid => <<"client1">>, username => <<"plain">>}, Plain1 = #{zone => external, clientid => <<"client1">>, username => <<"plain2">>}, @@ -116,7 +255,124 @@ t_check_auth(_) -> {ok, #{is_superuser := false}} = emqx_access_control:authenticate(Pbkdf2#{password => <<"password">>}), reload({auth_query, [{password_hash, {salt, bcrypt}}]}), {ok, #{is_superuser := false}} = emqx_access_control:authenticate(Bcrypt#{password => <<"foo">>}), - {error, _} = emqx_access_control:authenticate(User1#{password => <<"foo">>}). + {error, _} = emqx_access_control:authenticate(User1#{password => <<"foo">>}), + %% bad field config + reload({auth_query, [{password_field, [<<"bad_field">>]}]}), + ?assertEqual({error, password_error}, + emqx_access_control:authenticate(Plain#{password => <<"plain">>})), + %% unknown username + Unknown = #{zone => unknown, clientid => <<"?">>, username => <<"?">>, password => <<"">>}, + ?assertEqual({error, not_authorized}, emqx_access_control:authenticate(Unknown)), + ok. + +t_authn_full_selector_variables(Config) -> + Selector = ?config(selector, Config), + ClientInfo = #{ zone => external + , clientid => <<"client_full">> + , username => <<"user_full">> + , cn => <<"cn_full">> + , dn => <<"dn_full">> + , password => <<"plain">> + }, + ?assertMatch({ok, _}, emqx_access_control:authenticate(ClientInfo)), + EnvFields = [ clientid + , username + , cn + , dn + ], + lists:foreach( + fun(Field) -> + UnauthorizedClientInfo = ClientInfo#{Field => <<"wrong">>}, + ?assertEqual({error, not_authorized}, + emqx_access_control:authenticate(UnauthorizedClientInfo), + #{ field => Field + , client_info => UnauthorizedClientInfo + , selector => Selector + }) + end, + EnvFields), + ok. + +t_authn_interpolation_no_info(_Config) -> + Valid = #{zone => external, clientid => <<"client1">>, + username => <<"plain">>, password => <<"plain">>}, + ?assertMatch({ok, _}, emqx_access_control:authenticate(Valid)), + try + %% has values that are equal to placeholders + InterpolationUser = #{ <<"username">> => <<"%u">> + , <<"password">> => <<"plain">> + , <<"salt">> => <<"salt">> + , <<"is_superuser">> => true + }, + {ok, Conn} = ?POOL(?APP), + {{true, _}, _} = mongo_api:insert(Conn, ?MONGO_CL_USER, InterpolationUser), + Invalid = maps:without([username], Valid), + ?assertMatch({error, not_authorized}, emqx_access_control:authenticate(Invalid)) + after + deinit_mongo_data(), + init_mongo_data() + end. + +%% authenticates, but superquery returns no documents +t_authn_empty_is_superuser_collection(_Config) -> + {ok, SuperQuery} = application:get_env(emqx_auth_mongo, super_query), + Collection = list_to_binary(proplists:get_value(collection, SuperQuery)), + reload({auth_query, [{password_hash, plain}]}), + Plain = #{zone => external, clientid => <<"client1">>, + username => <<"plain">>, password => <<"plain">>}, + ok = snabbkaffe:start_trace(), + ?force_ordering( + #{?snk_kind := emqx_auth_mongo_superuser_check_authn_ok}, + #{?snk_kind := truncate_coll_enter}), + ?force_ordering( + #{?snk_kind := truncate_coll_done}, + #{?snk_kind := emqx_auth_mongo_superuser_query_enter}), + try + spawn_link(fun() -> + ?tp(truncate_coll_enter, #{}), + {ok, Conn} = ?POOL(?APP), + {true, _} = mongo_api:delete(Conn, Collection, _Selector = #{}), + ?tp(truncate_coll_done, #{}) + end), + ?assertMatch({ok, #{is_superuser := false}}, emqx_access_control:authenticate(Plain)), + ok = snabbkaffe:stop(), + ok + after + init_mongo_data() + end. + +t_available(Config) -> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + Pool = ?APP, + SuperQuery = #superquery{collection = SuperCollection} = superquery(), + %% success; + ?assertEqual(ok, emqx_auth_mongo:available(Pool, SuperQuery)), + %% error with code; + EmptySelector = #{}, + ?assertEqual( + {error, {mongo_error, 2}}, + emqx_auth_mongo:available(Pool, SuperCollection, EmptySelector, fun error_code_query/3)), + %% exception (in query) + ?assertMatch( + {error, _}, + with_failure(down, ProxyHost, ProxyPort, + fun() -> + Collection = <<"mqtt_user">>, + Selector = #{}, + emqx_auth_mongo:available(Pool, Collection, Selector) + end)), + %% exception (arbitrary function) + ?assertMatch( + {error, _}, + with_failure(down, ProxyHost, ProxyPort, + fun() -> + Collection = <<"mqtt_user">>, + Selector = #{}, + RaisingFun = fun(_, _, _) -> error(some_error) end, + emqx_auth_mongo:available(Pool, Collection, Selector, RaisingFun) + end)), + ok. t_check_acl(_) -> {ok, Connection} = ?POOL(?APP), @@ -132,7 +388,30 @@ t_check_acl(_) -> allow = emqx_access_control:check_acl(User2, subscribe, <<"$SYS/testuser/1">>), allow = emqx_access_control:check_acl(User3, publish, <<"a/b/c">>), deny = emqx_access_control:check_acl(User3, publish, <<"c">>), - deny = emqx_access_control:check_acl(User4, publish, <<"a/b/c">>). + deny = emqx_access_control:check_acl(User4, publish, <<"a/b/c">>), + %% undefined value to interpolate + User1Undef = User1#{clientid => undefined}, + allow = emqx_access_control:check_acl(User1Undef, subscribe, <<"users/testuser/1">>), + ok. + +t_acl_empty_results(_Config) -> + #aclquery{selector = Selector} = aclquery(), + User1 = #{zone => external, clientid => <<"client1">>, username => <<"testuser">>}, + try + reload({acl_query, [{selector, []}]}), + ?assertEqual(deny, emqx_access_control:check_acl(User1, subscribe, <<"users/testuser/1">>)), + ok + after + reload({acl_query, [{selector, Selector}]}) + end, + ok. + +t_acl_exception(_Config) -> + %% FIXME: is there a more authentic way to produce an exception in + %% `match'??? + User1 = #{zone => external, clientid => not_a_binary, username => <<"testuser">>}, + ?assertEqual(deny, emqx_access_control:check_acl(User1, subscribe, <<"users/testuser/1">>)), + ok. t_acl_super(_) -> reload({auth_query, [{password_hash, plain}, {password_field, [<<"password">>]}]}), @@ -155,10 +434,175 @@ t_acl_super(_) -> end, emqtt:disconnect(C). +%% apparently, if the config is undefined in `emqx_auth_mongo_app:r', +%% this is allowed... +t_is_superuser_undefined(_Config) -> + Pool = ClientInfo = unused_in_this_case, + SuperQuery = undefined, + ?assertNot(emqx_auth_mongo:is_superuser(Pool, SuperQuery, ClientInfo)), + ok. + +t_authn_timeout(Config) -> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + FailureType = timeout, + {ok, C} = emqtt:start_link([{clientid, <<"simpleClient">>}, + {username, <<"plain">>}, + {password, <<"plain">>}]), + unlink(C), + + ?check_trace( + try + enable_failure(FailureType, ProxyHost, ProxyPort), + {error, {unauthorized_client, _}} = emqtt:connect(C), + ok + after + heal_failure(FailureType, ProxyHost, ProxyPort) + end, + fun(Trace) -> + %% fails with `{exit,{{{badmatch,{{error,closed},...' + ?assertMatch([_], ?of_kind(emqx_auth_mongo_check_authn_error, Trace)), + ok + end), + + ok. + +%% tests query timeout failure +t_available_authn_query_timeout(Config) -> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + FailureType = timeout, + SuperQuery = superquery(), + + ?check_trace( + #{timetrap => timer:seconds(60)}, + try + enable_failure(FailureType, ProxyHost, ProxyPort), + Pool = ?APP, + %% query_multi returns an empty list even on failures. + ?assertEqual({error, timeout}, emqx_auth_mongo:available(Pool, SuperQuery)), + ok + after + heal_failure(FailureType, ProxyHost, ProxyPort) + end, + fun(Trace) -> + ?assertMatch( + [#{?snk_kind := emqx_auth_mongo_available_error , error := _}], + ?of_kind(emqx_auth_mongo_available_error, Trace)) + end), + + ok. + +%% tests query_multi failure +t_available_acl_query_no_connection(Config) -> + test_acl_query_failure(down, Config). + +%% ensure query_multi has a timeout +t_available_acl_query_timeout(Config) -> + ct:timetrap(90000), + test_acl_query_failure(timeout, Config). + +%% checks that `with_timeout' lets unknown errors pass through +t_query_multi_unknown_exception(_Config) -> + ok = meck:new(ecpool, [no_link, no_history, non_strict, passthrough]), + ok = meck:expect(ecpool, with_client, fun(_, _) -> throw(some_error) end), + Pool = ?APP, + Collection = ?MONGO_CL_ACL, + SelectorList = [#{<<"username">> => <<"user">>}], + try + ?assertThrow(some_error, emqx_auth_mongo:query_multi(Pool, Collection, SelectorList)) + after + meck:unload(ecpool) + end. + +t_acl_superuser_timeout(Config) -> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + FailureType = timeout, + reload({auth_query, [{password_hash, plain}, {password_field, [<<"password">>]}]}), + {ok, C} = emqtt:start_link([{clientid, <<"simpleClient">>}, + {username, <<"plain">>}, + {password, <<"plain">>}]), + unlink(C), + + ?check_trace( + try + ?force_ordering( + #{?snk_kind := emqx_auth_mongo_superuser_check_authn_ok}, + #{?snk_kind := connection_will_cut} + ), + ?force_ordering( + #{?snk_kind := connection_cut}, + #{?snk_kind := emqx_auth_mongo_superuser_query_enter} + ), + spawn(fun() -> + ?tp(connection_will_cut, #{}), + enable_failure(FailureType, ProxyHost, ProxyPort), + ?tp(connection_cut, #{}) + end), + + {ok, _} = emqtt:connect(C), + ok = emqtt:disconnect(C), + ok + after + heal_failure(FailureType, ProxyHost, ProxyPort) + end, + fun(Trace) -> + ?assertMatch( + [ #{ ?snk_kind := emqx_auth_mongo_superuser_query_error + , error := _ + } + , #{ ?snk_kind := emqx_auth_mongo_superuser_query_result + , is_superuser := false + } + ], + ?of_kind([ emqx_auth_mongo_superuser_query_error + , emqx_auth_mongo_superuser_query_result + ], Trace)) + end), + + ok. + %%-------------------------------------------------------------------- %% Utils %%-------------------------------------------------------------------- +test_acl_query_failure(FailureType, Config) -> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + ACLQuery = aclquery(), + + ?check_trace( + #{timetrap => timer:seconds(60)}, + try + ?force_ordering( + #{?snk_kind := emqx_auth_mongo_query_multi_enter}, + #{?snk_kind := connection_will_cut} + ), + ?force_ordering( + #{?snk_kind := connection_cut}, + #{?snk_kind := emqx_auth_mongo_query_multi_find_selector} + ), + spawn(fun() -> + ?tp(connection_will_cut, #{}), + enable_failure(FailureType, ProxyHost, ProxyPort), + ?tp(connection_cut, #{}) + end), + Pool = ?APP, + %% query_multi returns an empty list even on failures. + ?assertMatch(ok, emqx_auth_mongo:available(Pool, ACLQuery)), + ok + after + heal_failure(FailureType, ProxyHost, ProxyPort) + end, + fun(Trace) -> + ?assertMatch( + [#{?snk_kind := emqx_auth_mongo_query_multi_error , error := _}], + ?of_kind(emqx_auth_mongo_query_multi_error, Trace)) + end), + + ok. + reload({Par, Vals}) when is_list(Vals) -> application:stop(?APP), {ok, TupleVals} = application:get_env(?APP, Par), @@ -171,3 +615,105 @@ reload({Par, Vals}) when is_list(Vals) -> end, TupleVals), application:set_env(?APP, Par, lists:append(NewVals, Vals)), application:start(?APP). + +superquery() -> + emqx_auth_mongo_app:with_env(super_query, fun(SQ) -> SQ end). + +aclquery() -> + emqx_auth_mongo_app:with_env(acl_query, fun(SQ) -> SQ end). + +%% TODO: any easier way to make mongo return a map with an error code??? +error_code_query(Pool, Collection, Selector) -> + %% should be a query; this is to provoke an error return from + %% mongo. + WrongLimit = {}, + ecpool:with_client( + Pool, + fun(Conn) -> + mongoc:transaction_query( + Conn, + fun(Conf = #{pool := Worker}) -> + Query = mongoc:count_query(Conf, Collection, Selector, WrongLimit), + {_, Res} = mc_worker_api:command(Worker, Query), + Res + end) + end). + +wait_for_stabilization(#{attempts := Attempts, interval_ms := IntervalMS}) + when Attempts > 0 -> + try + {ok, Conn} = ?POOL(?APP), + #{} = mongo_api:find_one(Conn, ?MONGO_CL_USER, #{}, #{}), + ok + catch + _:_ -> + ct:pal("mongodb connection still stabilizing... sleeping for ~b ms", [IntervalMS]), + ct:sleep(IntervalMS), + wait_for_stabilization(#{attempts => Attempts - 1, interval_ms => IntervalMS}) + end; +wait_for_stabilization(_) -> + error(mongo_connection_did_not_stabilize). + +%% TODO: move to ct helpers??? +reset_proxy(ProxyHost, ProxyPort) -> + Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/reset", + Body = <<>>, + {ok, {{_, 204, _}, _, _}} = httpc:request(post, {Url, [], "application/json", Body}, [], + [{body_format, binary}]). + +with_failure(FailureType, ProxyHost, ProxyPort, Fun) -> + enable_failure(FailureType, ProxyHost, ProxyPort), + try + Fun() + after + heal_failure(FailureType, ProxyHost, ProxyPort) + end. + +enable_failure(FailureType, ProxyHost, ProxyPort) -> + case FailureType of + down -> switch_proxy(off, ProxyHost, ProxyPort); + timeout -> timeout_proxy(on, ProxyHost, ProxyPort); + latency_up -> latency_up_proxy(on, ProxyHost, ProxyPort) + end. + +heal_failure(FailureType, ProxyHost, ProxyPort) -> + case FailureType of + down -> switch_proxy(on, ProxyHost, ProxyPort); + timeout -> timeout_proxy(off, ProxyHost, ProxyPort); + latency_up -> latency_up_proxy(off, ProxyHost, ProxyPort) + end. + +switch_proxy(Switch, ProxyHost, ProxyPort) -> + Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/mongo", + Body = case Switch of + off -> <<"{\"enabled\":false}">>; + on -> <<"{\"enabled\":true}">> + end, + {ok, {{_, 200, _}, _, _}} = httpc:request(post, {Url, [], "application/json", Body}, [], + [{body_format, binary}]). + +timeout_proxy(on, ProxyHost, ProxyPort) -> + Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/mongo/toxics", + Body = <<"{\"name\":\"timeout\",\"type\":\"timeout\"," + "\"stream\":\"upstream\",\"toxicity\":1.0," + "\"attributes\":{\"timeout\":0}}">>, + {ok, {{_, 200, _}, _, _}} = httpc:request(post, {Url, [], "application/json", Body}, [], + [{body_format, binary}]); +timeout_proxy(off, ProxyHost, ProxyPort) -> + Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/mongo/toxics/timeout", + Body = <<>>, + {ok, {{_, 204, _}, _, _}} = httpc:request(delete, {Url, [], "application/json", Body}, [], + [{body_format, binary}]). + +latency_up_proxy(on, ProxyHost, ProxyPort) -> + Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/mongo/toxics", + Body = <<"{\"name\":\"latency_up\",\"type\":\"latency\"," + "\"stream\":\"upstream\",\"toxicity\":1.0," + "\"attributes\":{\"latency\":20000,\"jitter\":3000}}">>, + {ok, {{_, 200, _}, _, _}} = httpc:request(post, {Url, [], "application/json", Body}, [], + [{body_format, binary}]); +latency_up_proxy(off, ProxyHost, ProxyPort) -> + Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/mongo/toxics/latency_up", + Body = <<>>, + {ok, {{_, 204, _}, _, _}} = httpc:request(delete, {Url, [], "application/json", Body}, [], + [{body_format, binary}]). diff --git a/rebar.config b/rebar.config index f1e1aa014..7806d4297 100644 --- a/rebar.config +++ b/rebar.config @@ -59,7 +59,7 @@ , {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}} , {observer_cli, "1.6.1"} % NOTE: depends on recon 2.5.1 , {getopt, "1.0.1"} - , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.15.0"}}} + , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.1"}}} , {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.1"}}} , {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.13"}}} , {epgsql, {git, "https://github.com/emqx/epgsql.git", {tag, "4.6.0"}}} diff --git a/scripts/rel/cut4x.sh b/scripts/rel/cut4x.sh new file mode 100755 index 000000000..58131c757 --- /dev/null +++ b/scripts/rel/cut4x.sh @@ -0,0 +1,238 @@ +#!/usr/bin/env bash + +## cut a new 4.x release for EMQX (opensource or enterprise). + +set -euo pipefail +# ensure dir +cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/../.." + +usage() { + cat < "A script that fills in boilerplate for appup files. @@ -109,7 +112,7 @@ changes, supervisor changes, process restarts and so on. Also the load order of the beam files might need updating.~n"), halt(0); warn_and_exit(false) -> - log("~nERROR: Incomplete appups found. Please inspect the output for more details.~n"), + logerr("Incomplete appups found. Please inspect the output for more details.~n", []), halt(1). prepare(Baseline, Options = #{make_command := MakeCommand, beams_dir := BeamDir}) -> @@ -474,8 +477,8 @@ check_appup(App, Upgrade, Downgrade, OldUpgrade, OldDowngrade) -> ok; {diffs, Diffs} -> set_invalid(), - log("ERROR: Appup file for '~p' is not complete.~n" - "Missing:~100p~n", [App, Diffs]), + logerr("Appup file for '~p' is not complete.~n" + "Missing:~100p~n", [App, Diffs]), notok end. @@ -494,7 +497,7 @@ render_appup(App, File, Up, Down) -> do_render_appup(File, Up, Down); {error, enoent} when IsCheck -> %% failed to read old file, exit - log("ERROR: ~s is missing", [File]), + logerr("~s is missing", [File]), set_invalid() end. @@ -580,8 +583,8 @@ diff_app(UpOrDown, App, case UpOrDown =:= up of true -> %% only log for the upgrade case because it would be the same result - log("ERROR: Application '~p' contains changes, but its version is not updated. ~s", - [App, format_changes(Changes)]); + logerr("Application '~p' contains changes, but its version is not updated. ~s", + [App, format_changes(Changes)]); false -> ok end; @@ -723,5 +726,8 @@ log(Msg) -> log(Msg, Args) -> io:format(standard_error, Msg, Args). +logerr(Msg, Args) -> + io:format(standard_error, ?RED ++ "ERROR: "++ Msg ++ ?RESET, Args). + otp_standard_apps() -> [ssl, mnesia, kernel, asn1, stdlib]. diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 09b877307..aacc5e348 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -2,7 +2,8 @@ %% Unless you know what you are doing, DO NOT edit manually!! {VSN, [{"4.4.9", - [{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, + [{load_module,emqx_misc,brutal_purge,soft_purge,[]}, + {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_router_helper,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, @@ -13,7 +14,8 @@ {load_module,emqx_router,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.4.8", - [{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, + [{load_module,emqx_misc,brutal_purge,soft_purge,[]}, + {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_router_helper,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, @@ -253,7 +255,8 @@ {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], [{"4.4.9", - [{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, + [{load_module,emqx_misc,brutal_purge,soft_purge,[]}, + {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_router_helper,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, @@ -264,7 +267,8 @@ {load_module,emqx_router,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.4.8", - [{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, + [{load_module,emqx_misc,brutal_purge,soft_purge,[]}, + {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_router_helper,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index c366b878d..ec303bc3d 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -47,6 +47,8 @@ , index_of/2 , maybe_parse_ip/1 , ipv6_probe/1 + , pmap/2 + , pmap/3 ]). -export([ bin2hexstr_a_f_upper/1 @@ -57,7 +59,13 @@ -export([ is_sane_id/1 ]). +-export([ + nolink_apply/1, + nolink_apply/2 +]). + -define(VALID_STR_RE, "^[A-Za-z0-9]+[A-Za-z0-9-_]*$"). +-define(DEFAULT_PMAP_TIMEOUT, 5000). -spec is_sane_id(list() | binary()) -> ok | {error, Reason::binary()}. is_sane_id(Str) -> @@ -330,6 +338,110 @@ hexchar2int(I) when I >= $0 andalso I =< $9 -> I - $0; hexchar2int(I) when I >= $A andalso I =< $F -> I - $A + 10; hexchar2int(I) when I >= $a andalso I =< $f -> I - $a + 10. +%% @doc Like lists:map/2, only the callback function is evaluated +%% concurrently. +-spec pmap(fun((A) -> B), list(A)) -> list(B). +pmap(Fun, List) when is_function(Fun, 1), is_list(List) -> + pmap(Fun, List, ?DEFAULT_PMAP_TIMEOUT). + +-spec pmap(fun((A) -> B), list(A), timeout()) -> list(B). +pmap(Fun, List, Timeout) when + is_function(Fun, 1), is_list(List), is_integer(Timeout), Timeout >= 0 +-> + nolink_apply(fun() -> do_parallel_map(Fun, List) end, Timeout). + +%% @doc Delegate a function to a worker process. +%% The function may spawn_link other processes but we do not +%% want the caller process to be linked. +%% This is done by isolating the possible link with a not-linked +%% middleman process. +nolink_apply(Fun) -> nolink_apply(Fun, infinity). + +%% @doc Same as `nolink_apply/1', with a timeout. +-spec nolink_apply(function(), timer:timeout()) -> term(). +nolink_apply(Fun, Timeout) when is_function(Fun, 0) -> + Caller = self(), + ResRef = make_ref(), + Middleman = erlang:spawn(make_middleman_fn(Caller, Fun, ResRef)), + receive + {ResRef, {normal, Result}} -> + Result; + {ResRef, {exception, {C, E, S}}} -> + erlang:raise(C, E, S); + {ResRef, {'EXIT', Reason}} -> + exit(Reason) + after Timeout -> + exit(Middleman, kill), + exit(timeout) + end. + +-spec make_middleman_fn(pid(), fun(() -> any()), reference()) -> fun(() -> no_return()). +make_middleman_fn(Caller, Fun, ResRef) -> + fun() -> + process_flag(trap_exit, true), + CallerMRef = erlang:monitor(process, Caller), + Worker = erlang:spawn_link(make_worker_fn(Caller, Fun, ResRef)), + receive + {'DOWN', CallerMRef, process, _, _} -> + %% For whatever reason, if the caller is dead, + %% there is no reason to continue + exit(Worker, kill), + exit(normal); + {'EXIT', Worker, normal} -> + exit(normal); + {'EXIT', Worker, Reason} -> + %% worker exited with some reason other than 'normal' + _ = erlang:send(Caller, {ResRef, {'EXIT', Reason}}), + exit(normal) + end + end. + +-spec make_worker_fn(pid(), fun(() -> any()), reference()) -> fun(() -> no_return()). +make_worker_fn(Caller, Fun, ResRef) -> + fun() -> + Res = + try + {normal, Fun()} + catch + C:E:S -> + {exception, {C, E, S}} + end, + _ = erlang:send(Caller, {ResRef, Res}), + exit(normal) + end. + +do_parallel_map(Fun, List) -> + Parent = self(), + PidList = lists:map( + fun(Item) -> + erlang:spawn_link( + fun() -> + Res = + try + {normal, Fun(Item)} + catch + C:E:St -> + {exception, {C, E, St}} + end, + Parent ! {self(), Res} + end + ) + end, + List + ), + lists:foldr( + fun(Pid, Acc) -> + receive + {Pid, {normal, Result}} -> + [Result | Acc]; + {Pid, {exception, {C, E, St}}} -> + erlang:raise(C, E, St) + end + end, + [], + PidList + ). + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/test/emqx_misc_SUITE.erl b/test/emqx_misc_SUITE.erl index 0eec55faa..e9dd3e132 100644 --- a/test/emqx_misc_SUITE.erl +++ b/test/emqx_misc_SUITE.erl @@ -146,3 +146,36 @@ t_now_to_secs(_) -> t_now_to_ms(_) -> ?assert(is_integer(emqx_misc:now_to_ms(os:timestamp()))). +t_pmap_normal(_) -> + ?assertEqual( + [5, 7, 9], + emqx_misc:pmap( + fun({A, B}) -> A + B end, + [{2, 3}, {3, 4}, {4, 5}] + ) + ). + +t_pmap_timeout(_) -> + ?assertExit( + timeout, + emqx_misc:pmap( + fun + (timeout) -> ct:sleep(1000); + ({A, B}) -> A + B + end, + [{2, 3}, {3, 4}, timeout], + 100 + ) + ). + +t_pmap_exception(_) -> + ?assertError( + foobar, + emqx_misc:pmap( + fun + (error) -> error(foobar); + ({A, B}) -> A + B + end, + [{2, 3}, {3, 4}, error] + ) + ).