diff --git a/.github/actions/detect-profiles/action.yaml b/.github/actions/detect-profiles/action.yaml index 555674c28..6ba638c98 100644 --- a/.github/actions/detect-profiles/action.yaml +++ b/.github/actions/detect-profiles/action.yaml @@ -1,8 +1,4 @@ name: 'Detect profiles' -inputs: - ci_git_token: - required: true - type: string outputs: profiles: description: 'Detected profiles' @@ -14,13 +10,18 @@ runs: - id: detect-profiles shell: bash run: | - git config --global --add safe.directory "$GITHUB_WORKSPACE" - if make emqx-ee --dry-run > /dev/null 2>&1; then - echo "::set-output name=profiles::[\"emqx-ee\"]" - echo "https://ci%40emqx.io:${{ inputs.ci_git_token }}@github.com" > $HOME/.git-credentials - git config --global credential.helper store + if [ -d source ]; then + ## source code downloaded + cd source + fi + if [ ! -d .git ]; then + echo "Not git dir, $(pwd)" + exit 1 + fi + if [ -f 'EMQX_ENTERPRISE' ]; then + echo "profiles=[\"emqx-ee\"]" >> $GITHUB_OUTPUT echo "EMQX_NAME=emqx-ee" >> $GITHUB_ENV else - echo "::set-output name=profiles::[\"emqx\", \"emqx-edge\"]" + echo "profiles=[\"emqx\", \"emqx-edge\"]" >> $GITHUB_OUTPUT echo "EMQX_NAME=emqx" >> $GITHUB_ENV fi diff --git a/.github/actions/package-macos/action.yaml b/.github/actions/package-macos/action.yaml index 1a7e4fb9c..0624eb525 100644 --- a/.github/actions/package-macos/action.yaml +++ b/.github/actions/package-macos/action.yaml @@ -27,27 +27,23 @@ runs: shell: bash run: | brew update - brew install curl zip unzip gnu-sed kerl coreutils unixodbc freetds openssl@1.1 + brew install curl zip unzip gnu-sed coreutils unixodbc freetds openssl@1.1 echo "/usr/local/opt/bison/bin" >> $GITHUB_PATH echo "/usr/local/bin" >> $GITHUB_PATH - - uses: actions/cache@v2 + - uses: actions/cache@v3 id: cache with: - path: ~/.kerl/${{ inputs.otp }} + path: /opt/erlang/${{ inputs.otp }} key: otp-install-${{ inputs.otp }}-${{ inputs.os }}-static-ssl-disable-hipe-disable-jit - restore-keys: | - otp-install-${{ inputs.otp }}-${{ inputs.os }} - name: build erlang if: steps.cache.outputs.cache-hit != 'true' shell: bash - env: - KERL_BUILD_BACKEND: git - OTP_GITHUB_URL: https://github.com/emqx/otp - KERL_CONFIGURE_OPTIONS: --disable-dynamic-ssl-lib --with-ssl=/usr/local/opt/openssl@1.1 --disable-hipe --disable-jit run: | - kerl update releases - kerl build ${{ inputs.otp }} - kerl install ${{ inputs.otp }} $HOME/.kerl/${{ inputs.otp }} + git clone --depth 1 --branch OTP-${{ inputs.otp }} https://github.com/emqx/otp.git $HOME/otp-${{ inputs.otp }} + cd $HOME/otp-${{ inputs.otp }} + ./configure --disable-dynamic-ssl-lib --with-ssl=/usr/local/opt/openssl@1.1 --disable-hipe --disable-jit --prefix=/opt/erlang/${{ inputs.otp }} + make -j$(nproc) + sudo make install - name: build env: AUTO_INSTALL_BUILD_DEPS: 1 @@ -60,7 +56,7 @@ runs: APPLE_DEVELOPER_ID_BUNDLE_PASSWORD: ${{ inputs.apple_developer_id_bundle_password }} shell: bash run: | - . $HOME/.kerl/${{ inputs.otp }}/activate + export PATH="/opt/erlang/${{ inputs.otp }}/bin:$PATH" make ensure-rebar3 sudo cp rebar3 /usr/local/bin/rebar3 make ${EMQX_NAME}-zip diff --git a/.github/workflows/build_packages.yaml b/.github/workflows/build_packages.yaml index 04d7074b3..88e1d9c9e 100644 --- a/.github/workflows/build_packages.yaml +++ b/.github/workflows/build_packages.yaml @@ -26,14 +26,12 @@ jobs: profiles: ${{ steps.detect-profiles.outputs.profiles}} steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 with: path: source - fetch-depth: 0 - - id: detect-profiles + - name: detect-profiles + id: detect-profiles uses: ./source/.github/actions/detect-profiles - with: - ci_git_token: ${{ secrets.CI_GIT_TOKEN }} - name: get_all_deps if: endsWith(github.repository, 'emqx') run: | @@ -97,10 +95,10 @@ jobs: echo "EQMX installed" ./_build/${{ matrix.profile }}/rel/emqx/bin/emqx uninstall echo "EQMX uninstaled" - - uses: actions/upload-artifact@v1 + - uses: actions/upload-artifact@v3 with: name: ${{ matrix.profile }}-windows - path: source/_packages/${{ matrix.profile }}/. + path: source/_packages/${{ matrix.profile }}/ mac: needs: prepare @@ -114,7 +112,7 @@ jobs: runs-on: ${{ matrix.os }} steps: - - uses: actions/download-artifact@v2 + - uses: actions/download-artifact@v3 with: name: source path: . @@ -123,10 +121,7 @@ jobs: ln -s . source unzip -q source.zip rm source source.zip - - id: detect-profiles - uses: ./.github/actions/detect-profiles - with: - ci_git_token: ${{ secrets.CI_GIT_TOKEN }} + - uses: ./.github/actions/detect-profiles - uses: ./.github/actions/package-macos with: otp: ${{ matrix.otp }} @@ -135,10 +130,10 @@ jobs: apple_developer_identity: ${{ secrets.APPLE_DEVELOPER_IDENTITY }} apple_developer_id_bundle: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE }} apple_developer_id_bundle_password: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE_PASSWORD }} - - uses: actions/upload-artifact@v1 + - uses: actions/upload-artifact@v3 with: name: ${EMQX_NAME}-${{ matrix.otp }} - path: _packages/${EMQX_NAME}/. + path: _packages/${EMQX_NAME}/ linux: runs-on: ubuntu-20.04 @@ -214,7 +209,7 @@ jobs: - uses: actions/upload-artifact@v1 with: name: ${{ matrix.profile }}-${{ matrix.otp }} - path: source/_packages/${{ matrix.profile }}/. + path: source/_packages/${{ matrix.profile }}/ docker: runs-on: ubuntu-20.04 diff --git a/.github/workflows/build_slim_packages.yaml b/.github/workflows/build_slim_packages.yaml index 7c0989bef..8935373a5 100644 --- a/.github/workflows/build_slim_packages.yaml +++ b/.github/workflows/build_slim_packages.yaml @@ -36,12 +36,15 @@ jobs: steps: - uses: actions/checkout@v1 - - uses: ./.github/actions/detect-profiles - with: - ci_git_token: ${{ secrets.CI_GIT_TOKEN }} - name: fix-git-unsafe-repository run: git config --global --add safe.directory /__w/emqx/emqx - - uses: actions/cache@v2 + - uses: ./.github/actions/detect-profiles + - name: ensure access to github + if: endsWith(github.repository, 'enterprise') + run: | + echo "https://ci%40emqx.io:${{ secrets.CI_GIT_TOKEN }}@github.com" > $HOME/.git-credentials + git config --global credential.helper store + - uses: actions/cache@v3 with: # dialyzer PLTs path: ~/.cache/rebar3/ @@ -54,7 +57,7 @@ jobs: run: make ${EMQX_NAME}-zip - name: build deb/rpm packages run: make ${EMQX_NAME}-pkg - - uses: actions/upload-artifact@v1 + - uses: actions/upload-artifact@v3 if: failure() with: name: rebar3.crashdump @@ -64,7 +67,7 @@ jobs: export CODE_PATH="$GITHUB_WORKSPACE" .ci/build_packages/tests.sh "${EMQX_NAME}" zip .ci/build_packages/tests.sh "${EMQX_NAME}" pkg - - uses: actions/upload-artifact@v2 + - uses: actions/upload-artifact@v3 with: name: ${{ matrix.os }} path: _packages/**/*.zip @@ -116,10 +119,13 @@ jobs: - macos-11 runs-on: ${{ matrix.os }} steps: - - uses: actions/checkout@v1 + - uses: actions/checkout@v3 + - name: ensure access to github + if: endsWith(github.repository, 'enterprise') + run: | + echo "https://ci%40emqx.io:${{ secrets.CI_GIT_TOKEN }}@github.com" > $HOME/.git-credentials + git config --global credential.helper store - uses: ./.github/actions/detect-profiles - with: - ci_git_token: ${{ secrets.CI_GIT_TOKEN }} - uses: ./.github/actions/package-macos with: otp: ${{ matrix.otp }} @@ -128,13 +134,12 @@ jobs: apple_developer_identity: ${{ secrets.APPLE_DEVELOPER_IDENTITY }} apple_developer_id_bundle: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE }} apple_developer_id_bundle_password: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE_PASSWORD }} - - uses: actions/upload-artifact@v1 + - uses: actions/upload-artifact@v3 if: failure() with: name: rebar3.crashdump path: ./rebar3.crashdump - - - uses: actions/upload-artifact@v2 + - uses: actions/upload-artifact@v3 with: name: macos path: _packages/**/*.zip diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 6f3c829a9..a7f3cbe6c 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -13,14 +13,9 @@ jobs: profiles: ${{ steps.detect-profiles.outputs.profiles}} steps: - - uses: actions/checkout@v2 - with: - path: source - fetch-depth: 0 + - uses: actions/checkout@v3 - id: detect-profiles - uses: ./source/.github/actions/detect-profiles - with: - ci_git_token: ${{ secrets.CI_GIT_TOKEN }} + uses: ./.github/actions/detect-profiles upload: runs-on: ubuntu-20.04 @@ -59,12 +54,10 @@ jobs: -X POST \ -d "{\"repo\":\"emqx/emqx\", \"tag\": \"${{ github.ref_name }}\" }" \ ${{ secrets.EMQX_IO_RELEASE_API }} - - uses: actions/checkout@v2 - with: - fetch-depth: 0 + - uses: actions/checkout@v3 - name: get version id: version - run: echo "::set-output name=version::$(./pkg-vsn.sh)" + run: echo "version=$(./pkg-vsn.sh)" >> $GITHUB_OUTPUT - uses: emqx/push-helm-action@v1 if: github.event_name == 'release' && endsWith(github.repository, 'emqx') && matrix.profile == 'emqx' with: diff --git a/.github/workflows/run_test_cases.yaml b/.github/workflows/run_test_cases.yaml index 8bff3a33c..0a71b3c9c 100644 --- a/.github/workflows/run_test_cases.yaml +++ b/.github/workflows/run_test_cases.yaml @@ -10,140 +10,255 @@ on: pull_request: jobs: - run_proper_test: - runs-on: ubuntu-20.04 - container: ghcr.io/emqx/emqx-builder/4.4-20:24.3.4.2-1-ubuntu20.04 + prepare: + runs-on: ubuntu-20.04 + container: ghcr.io/emqx/emqx-builder/4.4-20:24.3.4.2-1-ubuntu20.04 + outputs: + ct_apps: ${{ steps.run_find_apps.outputs.ct_apps }} + steps: + - uses: actions/checkout@v3 + with: + path: source + fetch-depth: 0 + - name: git credentials + run: | + if make emqx-ee --dry-run > /dev/null 2>&1; then + echo "https://ci%40emqx.io:${{ secrets.CI_GIT_TOKEN }}@github.com" > $HOME/.git-credentials + git config --global credential.helper store + fi + - name: find_ct_apps + working-directory: source + id: run_find_apps + # emqx_plugin_libs doesn't have a test suite -> excluded from app list + # emqx ct is run independently -> exclude it from the app list + run: | + ct_apps="$(./scripts/find-apps.sh --json | jq -c 'del (.[] | select (. == "apps/emqx_plugin_libs" or . == "emqx"))')" + echo "ct-apps: $ct_apps" + echo "ct_apps=$ct_apps" >> $GITHUB_OUTPUT + - name: get_all_deps + working-directory: source + run: | + make deps-all + ./rebar3 as test compile + cd .. + zip -ryq source.zip source/* source/.[^.]* + - uses: actions/upload-artifact@v3 + with: + name: source + path: source.zip - steps: - - uses: actions/checkout@v2 - - name: set git credentials - run: | - if make emqx-ee --dry-run > /dev/null 2>&1; then - echo "https://ci%40emqx.io:${{ secrets.CI_GIT_TOKEN }}@github.com" > $HOME/.git-credentials - git config --global credential.helper store - fi - - name: proper - run: make proper + eunit_and_proper: + needs: prepare + runs-on: ubuntu-20.04 + container: ghcr.io/emqx/emqx-builder/4.4-20:24.3.4.2-1-ubuntu20.04 + strategy: + fail-fast: false + matrix: + task: + - eunit + - proper + steps: + - uses: AutoModality/action-clean@v1 + - uses: actions/download-artifact@v3 + with: + name: source + path: . + - name: unzip source code + run: unzip -o -q source.zip + # produces eunit.coverdata and proper.coverdata + - name: eunit and proper + working-directory: source + run: make ${{ matrix.task }} + - uses: actions/upload-artifact@v3 + with: + name: cover + path: source/_build/test/cover + if-no-files-found: warn - run_common_test: - runs-on: ${{ matrix.runs-on }} - strategy: - fail-fast: false - matrix: - runs-on: - - aws-amd64 - - ubuntu-20.04 - use-self-hosted: - - ${{ github.repository_owner == 'emqx' }} - exclude: - - runs-on: ubuntu-20.04 - use-self-hosted: true - - runs-on: aws-amd64 - use-self-hosted: false - steps: - - uses: actions/checkout@v2 - # to avoid dirty self-hosted runners - - name: stop containers - run: | - docker rm -f $(docker ps -qa) || true - docker network rm $(docker network ls -q) || true - - name: docker compose up - if: endsWith(github.repository, 'emqx') - env: - MYSQL_TAG: 8 - REDIS_TAG: 6 - MONGO_TAG: 4 - PGSQL_TAG: 13 - LDAP_TAG: 2.4.50 - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - run: | - docker-compose \ - -f .ci/docker-compose-file/docker-compose.yaml \ - -f .ci/docker-compose-file/docker-compose-toxiproxy.yaml \ - -f .ci/docker-compose-file/docker-compose-ldap-tcp.yaml \ - -f .ci/docker-compose-file/docker-compose-mongo-tcp.yaml \ - -f .ci/docker-compose-file/docker-compose-mysql-tcp.yaml \ - -f .ci/docker-compose-file/docker-compose-pgsql-tcp.yaml \ - -f .ci/docker-compose-file/docker-compose-redis-single-tcp.yaml \ - up -d --build - - name: docker compose up - if: endsWith(github.repository, 'emqx-enterprise') - env: - MYSQL_TAG: 8 - REDIS_TAG: 6 - MONGO_TAG: 4 - PGSQL_TAG: 13 - LDAP_TAG: 2.4.50 - OPENTSDB_TAG: latest - INFLUXDB_TAG: 1.7.6 - DYNAMODB_TAG: 1.11.477 - TIMESCALE_TAG: latest-pg11 - CASSANDRA_TAG: 3.11.6 - RABBITMQ_TAG: 3.7 - KAFKA_TAG: 2.5.0 - PULSAR_TAG: 2.3.2 - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - timeout-minutes: 20 - run: | - docker-compose \ - -f .ci/docker-compose-file/docker-compose.yaml \ - -f .ci/docker-compose-file/docker-compose-toxiproxy.yaml \ - -f .ci/docker-compose-file/docker-compose-ldap-tcp.yaml \ - -f .ci/docker-compose-file/docker-compose-mongo-tcp.yaml \ - -f .ci/docker-compose-file/docker-compose-mysql-tcp.yaml \ - -f .ci/docker-compose-file/docker-compose-pgsql-tcp.yaml \ - -f .ci/docker-compose-file/docker-compose-redis-single-tcp.yaml \ - -f .ci/docker-compose-file/docker-compose-enterprise.yaml \ - -f .ci/docker-compose-file/docker-compose-enterprise-cassandra-tcp.yaml \ - -f .ci/docker-compose-file/docker-compose-enterprise-dynamodb-tcp.yaml \ - -f .ci/docker-compose-file/docker-compose-enterprise-influxdb-tcp.yaml \ - -f .ci/docker-compose-file/docker-compose-enterprise-kafka-tcp.yaml \ - -f .ci/docker-compose-file/docker-compose-enterprise-opentsdb-tcp.yaml \ - -f .ci/docker-compose-file/docker-compose-enterprise-pulsar-tcp.yaml \ - -f .ci/docker-compose-file/docker-compose-enterprise-rabbit-tcp.yaml \ - -f .ci/docker-compose-file/docker-compose-enterprise-timescale-tcp.yaml \ - -f .ci/docker-compose-file/docker-compose-enterprise-mysql-client.yaml \ - -f .ci/docker-compose-file/docker-compose-enterprise-pgsql-and-timescale-client.yaml \ - up -d --build - docker exec -i erlang bash -c "echo \"https://ci%40emqx.io:${{ secrets.CI_GIT_TOKEN }}@github.com\" > /root/.git-credentials && git config --global credential.helper store" - docker exec -i erlang bash -c "git config --global --add safe.directory /emqx" - while [ $(docker ps -a --filter name=client --filter exited=0 | wc -l) \ - != $(docker ps -a --filter name=client | wc -l) ]; do - sleep 5 - done - - name: run eunit - run: | - docker exec -i erlang bash -c "make eunit" - - name: run common test - run: | - docker exec -i erlang bash -c "make ct" - - name: run cover - run: | - printenv > .env - docker exec -i erlang bash -c "git config --global --add safe.directory /emqx" - docker exec -i erlang bash -c "make cover" - docker exec --env-file .env -i erlang bash -c "make coveralls" - - name: cat rebar.crashdump - if: failure() - run: if [ -f 'rebar3.crashdump' ];then cat 'rebar3.crashdump'; fi - - uses: actions/upload-artifact@v1 - if: failure() - with: - name: logs - path: _build/test/logs - - uses: actions/upload-artifact@v1 - with: - name: cover - path: _build/test/cover + emqx_ct: + needs: prepare + runs-on: ubuntu-20.04 + container: ghcr.io/emqx/emqx-builder/4.4-20:24.3.4.2-1-ubuntu20.04 + strategy: + fail-fast: false + steps: + - uses: AutoModality/action-clean@v1 + - uses: actions/download-artifact@v3 + with: + name: source + path: . + - name: unzip source code + run: unzip -o -q source.zip + # produces emqx-emqx.coverdata + - name: emqx-ct-pipeline + working-directory: source + run: make emqx-ct-pipeline + - uses: actions/upload-artifact@v3 + with: + name: cover + path: source/_build/test/cover + if-no-files-found: warn - finish: - needs: run_common_test - runs-on: ubuntu-20.04 - steps: - - name: Coveralls Finished - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - run: | - curl -v -k https://coveralls.io/webhook \ - --header "Content-Type: application/json" \ - --data "{\"repo_name\":\"$GITHUB_REPOSITORY\",\"repo_token\":\"$GITHUB_TOKEN\",\"payload\":{\"build_num\":$GITHUB_RUN_ID,\"status\":\"done\"}}" || true + ct: + needs: prepare + runs-on: ${{ matrix.runs-on }} + strategy: + max-parallel: 12 + fail-fast: false + matrix: + app_name: ${{ fromJson(needs.prepare.outputs.ct_apps) }} + runs-on: + - aws-amd64 + - ubuntu-20.04 + use-self-hosted: + - ${{ github.repository_owner == 'emqx' }} + exclude: + - runs-on: ubuntu-20.04 + use-self-hosted: true + - runs-on: aws-amd64 + use-self-hosted: false + steps: + - uses: AutoModality/action-clean@v1 + - uses: actions/download-artifact@v3 + with: + name: source + path: . + - name: unzip source code + run: unzip -q source.zip + # to avoid dirty self-hosted runners + - name: stop containers + run: | + docker rm -f $(docker ps -qa) || true + docker network rm $(docker network ls -q) || true + - name: docker compose up + working-directory: source + if: endsWith(github.repository, 'emqx') + env: + MYSQL_TAG: 8 + REDIS_TAG: 6 + MONGO_TAG: 4 + PGSQL_TAG: 13 + LDAP_TAG: 2.4.50 + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + run: | + docker-compose \ + -f .ci/docker-compose-file/docker-compose.yaml \ + -f .ci/docker-compose-file/docker-compose-toxiproxy.yaml \ + -f .ci/docker-compose-file/docker-compose-ldap-tcp.yaml \ + -f .ci/docker-compose-file/docker-compose-mongo-tcp.yaml \ + -f .ci/docker-compose-file/docker-compose-mysql-tcp.yaml \ + -f .ci/docker-compose-file/docker-compose-pgsql-tcp.yaml \ + -f .ci/docker-compose-file/docker-compose-redis-single-tcp.yaml \ + up -d --build + docker exec -i erlang bash -c "git config --global --add safe.directory /emqx" + - name: docker compose up + working-directory: source + if: endsWith(github.repository, 'emqx-enterprise') + env: + MYSQL_TAG: 8 + REDIS_TAG: 6 + MONGO_TAG: 4 + PGSQL_TAG: 13 + LDAP_TAG: 2.4.50 + OPENTSDB_TAG: latest + INFLUXDB_TAG: 1.7.6 + DYNAMODB_TAG: 1.11.477 + TIMESCALE_TAG: latest-pg11 + CASSANDRA_TAG: 3.11.6 + RABBITMQ_TAG: 3.7 + KAFKA_TAG: 2.5.0 + PULSAR_TAG: 2.3.2 + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + timeout-minutes: 20 + run: | + docker-compose \ + -f .ci/docker-compose-file/docker-compose.yaml \ + -f .ci/docker-compose-file/docker-compose-toxiproxy.yaml \ + -f .ci/docker-compose-file/docker-compose-ldap-tcp.yaml \ + -f .ci/docker-compose-file/docker-compose-mongo-tcp.yaml \ + -f .ci/docker-compose-file/docker-compose-mysql-tcp.yaml \ + -f .ci/docker-compose-file/docker-compose-pgsql-tcp.yaml \ + -f .ci/docker-compose-file/docker-compose-redis-single-tcp.yaml \ + -f .ci/docker-compose-file/docker-compose-enterprise.yaml \ + -f .ci/docker-compose-file/docker-compose-enterprise-cassandra-tcp.yaml \ + -f .ci/docker-compose-file/docker-compose-enterprise-dynamodb-tcp.yaml \ + -f .ci/docker-compose-file/docker-compose-enterprise-influxdb-tcp.yaml \ + -f .ci/docker-compose-file/docker-compose-enterprise-kafka-tcp.yaml \ + -f .ci/docker-compose-file/docker-compose-enterprise-opentsdb-tcp.yaml \ + -f .ci/docker-compose-file/docker-compose-enterprise-pulsar-tcp.yaml \ + -f .ci/docker-compose-file/docker-compose-enterprise-rabbit-tcp.yaml \ + -f .ci/docker-compose-file/docker-compose-enterprise-timescale-tcp.yaml \ + -f .ci/docker-compose-file/docker-compose-enterprise-mysql-client.yaml \ + -f .ci/docker-compose-file/docker-compose-enterprise-pgsql-and-timescale-client.yaml \ + up -d --build + docker exec -i erlang bash -c "echo \"https://ci%40emqx.io:${{ secrets.CI_GIT_TOKEN }}@github.com\" > /root/.git-credentials && git config --global credential.helper store" + docker exec -i erlang bash -c "git config --global --add safe.directory /emqx" + while [ $(docker ps -a --filter name=client --filter exited=0 | wc -l) \ + != $(docker ps -a --filter name=client | wc -l) ]; do + sleep 5 + done + - name: run common test + run: docker exec -i erlang bash -c "make ${{ matrix.app_name }}-ct-pipeline" + - name: cat rebar.crashdump + if: failure() + working-directory: source + run: if [ -f 'rebar3.crashdump' ];then cat 'rebar3.crashdump'; fi + - name: set log file name + if: failure() + run: echo "LOGFILENAME=logs-$(echo ${{ matrix.app_name }} | tr '/' '_')" >> $GITHUB_ENV + - uses: actions/upload-artifact@v3 + if: failure() + with: + name: ${{ env.LOGFILENAME }} + path: source/_build/test/logs + if-no-files-found: warn + - uses: actions/upload-artifact@v3 + with: + name: cover + path: source/_build/test/cover + if-no-files-found: warn + + make_cover: + needs: + - eunit_and_proper + - emqx_ct + - ct + runs-on: ubuntu-20.04 + container: ghcr.io/emqx/emqx-builder/4.4-20:24.3.4.2-1-ubuntu20.04 + steps: + - uses: AutoModality/action-clean@v1 + - uses: actions/download-artifact@v3 + with: + name: source + path: . + - name: unzip source code + run: unzip -q source.zip + - uses: actions/download-artifact@v3 + name: download cover data + with: + name: cover + path: source/_build/test/cover + - name: make cover + working-directory: source + run: make cover + - name: send to coveralls + working-directory: source + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + run: make coveralls + - name: get coveralls logs + working-directory: source + if: failure() + run: cat rebar3.crashdump + + finish: + needs: make_cover + runs-on: ubuntu-20.04 + steps: + - name: Coveralls Finished + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + run: | + curl -v -k https://coveralls.io/webhook \ + --header "Content-Type: application/json" \ + --data "{\"repo_name\":\"$GITHUB_REPOSITORY\",\"repo_token\":\"$GITHUB_TOKEN\",\"payload\":{\"build_num\":$GITHUB_RUN_ID,\"status\":\"done\"}}" || true diff --git a/Makefile b/Makefile index 84cfcda5e..95bb522c7 100644 --- a/Makefile +++ b/Makefile @@ -62,6 +62,14 @@ $1-ct: $(REBAR) endef $(foreach app,$(APPS),$(eval $(call gen-app-ct-target,$(app)))) +## app/name-ct-pipeline targets are used in pipeline -> make cover data for each app +.PHONY: $(APPS:%=%-ct-pipeline) +define gen-app-ct-target-pipeline +$1-ct-pipeline: $(REBAR) + $(REBAR) ct --name 'test@127.0.0.1' -c -v --cover_export_name $(PROFILE)-$(subst /,-,$1) --suite $(shell $(CURDIR)/scripts/find-suites.sh $1) +endef +$(foreach app,$(APPS),$(eval $(call gen-app-ct-target-pipeline,$(app)))) + ## apps/name-prop targets .PHONY: $(APPS:%=%-prop) define gen-app-prop-target diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl index 1739bd47a..e45903113 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl @@ -40,6 +40,7 @@ -define(RESOURCE_TYPE_MQTT, 'bridge_mqtt'). -define(RESOURCE_TYPE_RPC, 'bridge_rpc'). +-define(BAD_TOPIC_WITH_WILDCARD, wildcard_topic_not_allowed_for_publish). -define(RESOURCE_CONFIG_SPEC_MQTT, #{ address => #{ @@ -494,7 +495,7 @@ on_action_create_data_to_mqtt_broker(ActId, Opts = #{<<"pool">> := PoolName, PayloadTks = emqx_rule_utils:preproc_tmpl(PayloadTmpl), TopicTks = case ForwardTopic == <<"">> of true -> undefined; - false -> emqx_rule_utils:preproc_tmpl(ForwardTopic) + false -> emqx_rule_utils:preproc_tmpl(assert_topic_valid(ForwardTopic)) end, Opts. @@ -515,7 +516,7 @@ on_action_data_to_mqtt_broker(Msg, _Env = qos = QoS, from = From, flags = Flags, - topic = Topic1, + topic = assert_topic_valid(Topic1), payload = format_data(PayloadTks, Msg), timestamp = TimeStamp}, ecpool:with_client(PoolName, @@ -583,7 +584,7 @@ options(Options, PoolName, ResId) -> Get = fun(Key) -> GetD(Key, undefined) end, Address = Get(<<"address">>), [{max_inflight_batches, 32}, - {forward_mountpoint, str(Get(<<"mountpoint">>))}, + {forward_mountpoint, str(assert_topic_valid(Get(<<"mountpoint">>)))}, {disk_cache, cuttlefish_flag:parse(str(GetD(<<"disk_cache">>, "off")))}, {start_type, auto}, {reconnect_delay_ms, cuttlefish_duration:parse(str(Get(<<"reconnect_interval">>)), ms)}, @@ -610,6 +611,12 @@ options(Options, PoolName, ResId) -> | maybe_ssl(Options, Get(<<"ssl">>), ResId)] end. +assert_topic_valid(T) -> + case emqx_topic:wildcard(T) of + true -> throw({?BAD_TOPIC_WITH_WILDCARD, T}); + false -> T + end. + maybe_ssl(_Options, false, _ResId) -> []; maybe_ssl(Options, true, ResId) -> diff --git a/apps/emqx_management/etc/emqx_management.conf b/apps/emqx_management/etc/emqx_management.conf index 1dbfc1583..ef6491b25 100644 --- a/apps/emqx_management/etc/emqx_management.conf +++ b/apps/emqx_management/etc/emqx_management.conf @@ -22,7 +22,7 @@ management.default_application.secret = public ## Initialize apps file ## Is used to add administrative app/secrets when EMQX is launched for the first time. -## This config will not take any effect once EMQX database is populated with the provided apps. +## This config will not take any effect once EMQX database has one or more apps. ## The file content format is as below: ## ``` ##819e5db182cf:l9C5suZClIF3FvdzWqmINrVU61WNfIjcglxw9CVM7y1VI diff --git a/apps/emqx_management/test/emqx_bridge_mqtt_data_export_import_SUITE.erl b/apps/emqx_management/test/emqx_bridge_mqtt_data_export_import_SUITE.erl index c3921140a..7d7a7245c 100644 --- a/apps/emqx_management/test/emqx_bridge_mqtt_data_export_import_SUITE.erl +++ b/apps/emqx_management/test/emqx_bridge_mqtt_data_export_import_SUITE.erl @@ -32,8 +32,11 @@ init_per_suite(Cfg) -> ok = emqx_dashboard_admin:mnesia(boot), application:load(emqx_modules), application:load(emqx_bridge_mqtt), + ekka_mnesia:start(), + emqx_dashboard_admin:mnesia(boot), emqx_ct_helpers:start_apps([emqx_rule_engine, emqx_management]), application:ensure_all_started(emqx_dashboard), + ok = emqx_rule_engine:load_providers(), Cfg. end_per_suite(Cfg) -> diff --git a/apps/emqx_management/test/emqx_mgmt_bootstrap_app_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_bootstrap_app_SUITE.erl index 7748c0224..e3894beb1 100644 --- a/apps/emqx_management/test/emqx_mgmt_bootstrap_app_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_bootstrap_app_SUITE.erl @@ -41,6 +41,8 @@ init_per_suite(Config) -> Config. end_per_suite(_) -> + ok = application:unset_env(emqx_management, bootstrap_apps_file), + _ = mnesia:clear_table(mqtt_app), emqx_ct_helpers:stop_apps([]), ok. diff --git a/apps/emqx_rule_engine/include/rule_actions.hrl b/apps/emqx_rule_engine/include/rule_actions.hrl index 4571849d3..cd47e2123 100644 --- a/apps/emqx_rule_engine/include/rule_actions.hrl +++ b/apps/emqx_rule_engine/include/rule_actions.hrl @@ -28,3 +28,5 @@ -define(bound_v(Key, ENVS0), maps:get(Key, maps:get(?BINDING_KEYS, ENVS0, #{}))). + +-define(JWT_TABLE, emqx_rule_engine_jwt_table). diff --git a/apps/emqx_rule_engine/rebar.config b/apps/emqx_rule_engine/rebar.config index f9ad1b283..5154decf5 100644 --- a/apps/emqx_rule_engine/rebar.config +++ b/apps/emqx_rule_engine/rebar.config @@ -1,5 +1,6 @@ %% -*- mode: erlang -*- -{deps, []}. +{deps, [ {jose, {git, "https://github.com/emqx/erlang-jose", {tag, "emqx-1.11.3"}}} + ]}. %% Comple Opts {erl_opts, [warn_unused_vars, diff --git a/apps/emqx_rule_engine/src/emqx_rule_actions.erl b/apps/emqx_rule_engine/src/emqx_rule_actions.erl index 557ddf423..67f68c99b 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_actions.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_actions.erl @@ -22,6 +22,8 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). +-define(BAD_TOPIC_WITH_WILDCARD, wildcard_topic_not_allowed_for_publish). + -define(REPUBLISH_PARAMS_SPEC, #{ target_topic => #{ order => 1, @@ -163,7 +165,7 @@ on_action_create_republish(Id, Params = #{ }) -> TargetRetain = to_retain(maps:get(<<"target_retain">>, Params, <<"false">>)), TargetQoS = to_qos(TargetQoS0), - TopicTks = emqx_rule_utils:preproc_tmpl(TargetTopic), + TopicTks = emqx_rule_utils:preproc_tmpl(assert_topic_valid(TargetTopic)), PayloadTks = emqx_rule_utils:preproc_tmpl(PayloadTmpl), Params. @@ -201,7 +203,7 @@ on_action_republish(Selected, _Envs = #{ from = ActId, flags = Flags#{retain => get_retain(TargetRetain, Selected)}, headers = #{republish_by => ActId}, - topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected), + topic = assert_topic_valid(emqx_rule_utils:proc_tmpl(TopicTks, Selected)), payload = format_msg(PayloadTks, Selected), timestamp = Timestamp }, @@ -226,7 +228,7 @@ on_action_republish(Selected, _Envs = #{ from = ActId, flags = #{dup => false, retain => get_retain(TargetRetain, Selected)}, headers = #{republish_by => ActId}, - topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected), + topic = assert_topic_valid(emqx_rule_utils:proc_tmpl(TopicTks, Selected)), payload = format_msg(PayloadTks, Selected), timestamp = erlang:system_time(millisecond) }, @@ -270,6 +272,12 @@ get_qos(-1, _Data, Default) -> Default; get_qos(TargetQoS, Data, _Default) -> qos(emqx_rule_utils:replace_var(TargetQoS, Data)). +assert_topic_valid(T) -> + case emqx_topic:wildcard(T) of + true -> throw({?BAD_TOPIC_WITH_WILDCARD, T}); + false -> T + end. + qos(<<"0">>) -> 0; qos(<<"1">>) -> 1; qos(<<"2">>) -> 2; diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src index b02edb456..31e08f10f 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src @@ -2,8 +2,8 @@ [{description, "EMQ X Rule Engine"}, {vsn, "4.4.11"}, % strict semver, bump manually! {modules, []}, - {registered, [emqx_rule_engine_sup, emqx_rule_registry]}, - {applications, [kernel,stdlib,rulesql,getopt]}, + {registered, [emqx_rule_engine_sup, emqx_rule_registry, emqx_rule_engine_jwt_sup]}, + {applications, [kernel,stdlib,rulesql,getopt,jose]}, {mod, {emqx_rule_engine_app, []}}, {env, []}, {licenses, ["Apache-2.0"]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index c877c23a2..1ef25a529 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -2,7 +2,13 @@ %% Unless you know what you are doing, DO NOT edit manually!! {VSN, [{"4.4.10", - [{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + [{add_module,emqx_rule_engine_jwt}, + {add_module,emqx_rule_engine_jwt_worker}, + {add_module,emqx_rule_engine_jwt_sup}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {apply,{emqx_rule_engine_sup,start_jwt_sup,[]}}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, @@ -12,7 +18,13 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.4.9", - [{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + [{add_module,emqx_rule_engine_jwt}, + {add_module,emqx_rule_engine_jwt_worker}, + {add_module,emqx_rule_engine_jwt_sup}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {apply,{emqx_rule_engine_sup,start_jwt_sup,[]}}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, @@ -23,7 +35,13 @@ {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.4.8", - [{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + [{add_module,emqx_rule_engine_jwt}, + {add_module,emqx_rule_engine_jwt_worker}, + {add_module,emqx_rule_engine_jwt_sup}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {apply,{emqx_rule_engine_sup,start_jwt_sup,[]}}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, @@ -35,7 +53,12 @@ {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {<<"4\\.4\\.[6-7]">>, - [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, + [{add_module,emqx_rule_engine_jwt}, + {add_module,emqx_rule_engine_jwt_worker}, + {add_module,emqx_rule_engine_jwt_sup}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {apply,{emqx_rule_engine_sup,start_jwt_sup,[]}}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -48,7 +71,12 @@ {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.4.5", - [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, + [{add_module,emqx_rule_engine_jwt}, + {add_module,emqx_rule_engine_jwt_worker}, + {add_module,emqx_rule_engine_jwt_sup}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {apply,{emqx_rule_engine_sup,start_jwt_sup,[]}}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -62,7 +90,12 @@ {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.4.4", - [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, + [{add_module,emqx_rule_engine_jwt}, + {add_module,emqx_rule_engine_jwt_worker}, + {add_module,emqx_rule_engine_jwt_sup}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {apply,{emqx_rule_engine_sup,start_jwt_sup,[]}}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -76,7 +109,12 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]}, {"4.4.3", - [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, + [{add_module,emqx_rule_engine_jwt}, + {add_module,emqx_rule_engine_jwt_worker}, + {add_module,emqx_rule_engine_jwt_sup}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {apply,{emqx_rule_engine_sup,start_jwt_sup,[]}}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -92,7 +130,12 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.4.2", - [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, + [{add_module,emqx_rule_engine_jwt}, + {add_module,emqx_rule_engine_jwt_worker}, + {add_module,emqx_rule_engine_jwt_sup}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {apply,{emqx_rule_engine_sup,start_jwt_sup,[]}}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, @@ -109,7 +152,12 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.4.1", - [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, + [{add_module,emqx_rule_engine_jwt}, + {add_module,emqx_rule_engine_jwt_worker}, + {add_module,emqx_rule_engine_jwt_sup}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {apply,{emqx_rule_engine_sup,start_jwt_sup,[]}}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, @@ -126,7 +174,12 @@ {add_module,emqx_rule_date}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.4.0", - [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, + [{add_module,emqx_rule_engine_jwt}, + {add_module,emqx_rule_engine_jwt_worker}, + {add_module,emqx_rule_engine_jwt_sup}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, + {apply,{emqx_rule_engine_sup,start_jwt_sup,[]}}, + {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, @@ -144,42 +197,64 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], [{"4.4.10", - [{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {apply,{supervisor,terminate_child, + [emqx_rule_engine_sup,emqx_rule_engine_jwt_sup]}}, + {delete_module,emqx_rule_engine_jwt_sup}, + {delete_module,emqx_rule_engine_jwt_worker}, + {delete_module,emqx_rule_engine_jwt}]}, {"4.4.9", - [{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {apply,{supervisor,terminate_child, + [emqx_rule_engine_sup,emqx_rule_engine_jwt_sup]}}, + {delete_module,emqx_rule_engine_jwt_sup}, + {delete_module,emqx_rule_engine_jwt_worker}, + {delete_module,emqx_rule_engine_jwt}]}, {"4.4.8", - [{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {apply,{supervisor,terminate_child, + [emqx_rule_engine_sup,emqx_rule_engine_jwt_sup]}}, + {delete_module,emqx_rule_engine_jwt_sup}, + {delete_module,emqx_rule_engine_jwt_worker}, + {delete_module,emqx_rule_engine_jwt}]}, {<<"4\\.4\\.[6-7]">>, [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -188,11 +263,17 @@ {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {apply,{supervisor,terminate_child, + [emqx_rule_engine_sup,emqx_rule_engine_jwt_sup]}}, + {delete_module,emqx_rule_engine_jwt_sup}, + {delete_module,emqx_rule_engine_jwt_worker}, + {delete_module,emqx_rule_engine_jwt}]}, {"4.4.5", [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -202,11 +283,17 @@ {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {apply,{supervisor,terminate_child, + [emqx_rule_engine_sup,emqx_rule_engine_jwt_sup]}}, + {delete_module,emqx_rule_engine_jwt_sup}, + {delete_module,emqx_rule_engine_jwt_worker}, + {delete_module,emqx_rule_engine_jwt}]}, {"4.4.4", [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -216,10 +303,16 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {apply,{supervisor,terminate_child, + [emqx_rule_engine_sup,emqx_rule_engine_jwt_sup]}}, + {delete_module,emqx_rule_engine_jwt_sup}, + {delete_module,emqx_rule_engine_jwt_worker}, + {delete_module,emqx_rule_engine_jwt}]}, {"4.4.3", [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, @@ -232,10 +325,16 @@ {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {apply,{supervisor,terminate_child, + [emqx_rule_engine_sup,emqx_rule_engine_jwt_sup]}}, + {delete_module,emqx_rule_engine_jwt_sup}, + {delete_module,emqx_rule_engine_jwt_worker}, + {delete_module,emqx_rule_engine_jwt}]}, {"4.4.2", [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, @@ -249,10 +348,16 @@ {delete_module,emqx_rule_date}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {apply,{supervisor,terminate_child, + [emqx_rule_engine_sup,emqx_rule_engine_jwt_sup]}}, + {delete_module,emqx_rule_engine_jwt_sup}, + {delete_module,emqx_rule_engine_jwt_worker}, + {delete_module,emqx_rule_engine_jwt}]}, {"4.4.1", [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, @@ -266,10 +371,16 @@ {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}, - {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {apply,{supervisor,terminate_child, + [emqx_rule_engine_sup,emqx_rule_engine_jwt_sup]}}, + {delete_module,emqx_rule_engine_jwt_sup}, + {delete_module,emqx_rule_engine_jwt_worker}, + {delete_module,emqx_rule_engine_jwt}]}, {"4.4.0", [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, @@ -283,5 +394,10 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, - {delete_module,emqx_rule_date}]}, + {apply,{supervisor,terminate_child, + [emqx_rule_engine_sup,emqx_rule_engine_jwt_sup]}}, + {delete_module,emqx_rule_date}, + {delete_module,emqx_rule_engine_jwt_sup}, + {delete_module,emqx_rule_engine_jwt_worker}, + {delete_module,emqx_rule_engine_jwt}]}, {<<".*">>,[]}]}. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 24dc58a97..e642932c2 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -277,7 +277,7 @@ create_resource(#{type := Type, config := Config0} = Params, Retry) -> _ = try ?CLUSTER_CALL(init_resource_with_retrier, InitArgs, ok, init_resource, InitArgs) catch throw : Reason -> - ?LOG(error, "create_resource failed: ~0p", [Reason]) + ?LOG_SENSITIVE(warning, "create_resource failed: ~0p", [Reason]) end, {ok, Resource}; no_retry -> @@ -285,6 +285,7 @@ create_resource(#{type := Type, config := Config0} = Params, Retry) -> _ = ?CLUSTER_CALL(init_resource, InitArgs), {ok, Resource} catch throw : Reason -> + ?LOG_SENSITIVE(error, "create_resource failed: ~0p", [Reason]), {error, Reason} end end; diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_jwt.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_jwt.erl new file mode 100644 index 000000000..828c77f93 --- /dev/null +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_jwt.erl @@ -0,0 +1,45 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_rule_engine_jwt). + +-include("rule_engine.hrl"). +-include("rule_actions.hrl"). + +%% API +-export([ lookup_jwt/1 + , lookup_jwt/2 + ]). + +-type jwt() :: binary(). + +-spec lookup_jwt(resource_id()) -> {ok, jwt()} | {error, not_found}. +lookup_jwt(ResourceId) -> + ?MODULE:lookup_jwt(?JWT_TABLE, ResourceId). + +-spec lookup_jwt(ets:table(), resource_id()) -> {ok, jwt()} | {error, not_found}. +lookup_jwt(TId, ResourceId) -> + try + case ets:lookup(TId, {ResourceId, jwt}) of + [{{ResourceId, jwt}, JWT}] -> + {ok, JWT}; + [] -> + {error, not_found} + end + catch + error:badarg -> + {error, not_found} + end. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_sup.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_sup.erl new file mode 100644 index 000000000..b393dd08b --- /dev/null +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_sup.erl @@ -0,0 +1,89 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_rule_engine_jwt_sup). + +-behaviour(supervisor). + +-export([ start_link/0 + , ensure_worker_present/2 + , ensure_worker_deleted/1 + ]). + +-export([init/1]). + +-include_lib("emqx_rule_engine/include/rule_actions.hrl"). + +-type worker_id() :: term(). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + ensure_jwt_table(), + SupFlags = #{ strategy => one_for_one + , intensity => 10 + , period => 5 + , auto_shutdown => never + }, + ChildSpecs = [], + {ok, {SupFlags, ChildSpecs}}. + +%% @doc Starts a new JWT worker. The caller should use +%% `emqx_rule_engine_jwt_sup:ensure_jwt/1' to ensure that a JWT has +%% been stored, if synchronization is needed. +-spec ensure_worker_present(worker_id(), map()) -> + {ok, supervisor:child()}. +ensure_worker_present(Id, Config) -> + ChildSpec = jwt_worker_child_spec(Id, Config), + case supervisor:start_child(?MODULE, ChildSpec) of + {ok, Pid} -> + {ok, Pid}; + {error, {already_started, Pid}} -> + {ok, Pid}; + {error, already_present} -> + supervisor:restart_child(?MODULE, Id) + end. + +%% @doc Stops a given JWT worker by its id. +-spec ensure_worker_deleted(worker_id()) -> ok. +ensure_worker_deleted(Id) -> + case supervisor:terminate_child(?MODULE, Id) of + ok -> ok; + {error, not_found} -> ok + end. + +jwt_worker_child_spec(Id, Config) -> + #{ id => Id + , start => {emqx_rule_engine_jwt_worker, start_link, [Config]} + , restart => transient + , type => worker + , significant => false + , shutdown => brutal_kill + , modules => [emqx_rule_engine_jwt_worker] + }. + +-spec ensure_jwt_table() -> ok. +ensure_jwt_table() -> + case ets:whereis(?JWT_TABLE) of + undefined -> + Opts = [named_table, public, + {read_concurrency, true}, ordered_set], + _ = ets:new(?JWT_TABLE, Opts), + ok; + _ -> + ok + end. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_worker.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_worker.erl new file mode 100644 index 000000000..4190a3536 --- /dev/null +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_worker.erl @@ -0,0 +1,216 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_rule_engine_jwt_worker). + +-behaviour(gen_server). + +%% API +-export([ start_link/1 + , ensure_jwt/1 + ]). + +%% gen_server API +-export([ init/1 + , handle_continue/2 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , format_status/1 + , format_status/2 + ]). + +-include_lib("jose/include/jose_jwk.hrl"). +-include_lib("emqx_rule_engine/include/rule_engine.hrl"). +-include_lib("emqx_rule_engine/include/rule_actions.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-type config() :: #{ private_key := binary() + , resource_id := resource_id() + , expiration := timer:time() + , table := ets:table() + , iss := binary() + , sub := binary() + , aud := binary() + , kid := binary() + , alg := binary() + }. +-type jwt() :: binary(). +-type state() :: #{ refresh_timer := undefined | timer:tref() + , resource_id := resource_id() + , expiration := timer:time() + , table := ets:table() + , jwt := undefined | jwt() + %% only undefined during startup + , jwk := undefined | jose_jwk:key() + , iss := binary() + , sub := binary() + , aud := binary() + , kid := binary() + , alg := binary() + }. + +-define(refresh_jwt, refresh_jwt). + +%%----------------------------------------------------------------------------------------- +%% API +%%----------------------------------------------------------------------------------------- + +-spec start_link(config()) -> gen_server:start_ret(). +start_link(#{ private_key := _ + , expiration := _ + , resource_id := _ + , table := _ + , iss := _ + , sub := _ + , aud := _ + , kid := _ + , alg := _ + } = Config) -> + gen_server:start_link(?MODULE, Config, []). + +-spec ensure_jwt(pid()) -> reference(). +ensure_jwt(Worker) -> + Ref = alias([reply]), + gen_server:cast(Worker, {ensure_jwt, Ref}), + Ref. + +%%----------------------------------------------------------------------------------------- +%% gen_server API +%%----------------------------------------------------------------------------------------- + +-spec init(config()) -> {ok, state(), {continue, {make_key, binary()}}} + | {stop, {error, term()}}. +init(#{private_key := PrivateKeyPEM} = Config) -> + State0 = maps:without([private_key], Config), + State = State0#{ jwk => undefined + , jwt => undefined + , refresh_timer => undefined + }, + {ok, State, {continue, {make_key, PrivateKeyPEM}}}. + +handle_continue({make_key, PrivateKeyPEM}, State0) -> + case jose_jwk:from_pem(PrivateKeyPEM) of + JWK = #jose_jwk{} -> + State = State0#{jwk := JWK}, + {noreply, State, {continue, create_token}}; + [] -> + ?tp(rule_engine_jwt_worker_startup_error, #{error => empty_key}), + {stop, {shutdown, {error, empty_key}}, State0}; + {error, Reason} -> + Error = {invalid_private_key, Reason}, + ?tp(rule_engine_jwt_worker_startup_error, #{error => Error}), + {stop, {shutdown, {error, Error}}, State0}; + Error0 -> + Error = {invalid_private_key, Error0}, + ?tp(rule_engine_jwt_worker_startup_error, #{error => Error}), + {stop, {shutdown, {error, Error}}, State0} + end; +handle_continue(create_token, State0) -> + State = generate_and_store_jwt(State0), + {noreply, State}. + +handle_call(_Req, _From, State) -> + {reply, {error, bad_call}, State}. + +handle_cast({ensure_jwt, From}, State0 = #{jwt := JWT}) -> + State = + case JWT of + undefined -> + generate_and_store_jwt(State0); + _ -> + State0 + end, + From ! {From, token_created}, + {noreply, State}; +handle_cast(_Req, State) -> + {noreply, State}. + +handle_info({timeout, TRef, ?refresh_jwt}, State0 = #{refresh_timer := TRef}) -> + State = generate_and_store_jwt(State0), + {noreply, State}; +handle_info(_Msg, State) -> + {noreply, State}. + +format_status(State) -> + censor_secrets(State). + +format_status(_Opt, [_PDict, State0]) -> + State = censor_secrets(State0), + [{data, [{"State", State}]}]. + +%%----------------------------------------------------------------------------------------- +%% Helper fns +%%----------------------------------------------------------------------------------------- + +-spec do_generate_jwt(state()) -> jwt(). +do_generate_jwt(#{ expiration := ExpirationMS + , iss := Iss + , sub := Sub + , aud := Aud + , kid := KId + , alg := Alg + , jwk := JWK + } = _State) -> + Headers = #{ <<"alg">> => Alg + , <<"kid">> => KId + }, + Now = erlang:system_time(seconds), + ExpirationS = erlang:convert_time_unit(ExpirationMS, millisecond, second), + Claims = #{ <<"iss">> => Iss + , <<"sub">> => Sub + , <<"aud">> => Aud + , <<"iat">> => Now + , <<"exp">> => Now + ExpirationS + }, + JWT0 = jose_jwt:sign(JWK, Headers, Claims), + {_, JWT} = jose_jws:compact(JWT0), + JWT. + +-spec generate_and_store_jwt(state()) -> state(). +generate_and_store_jwt(State0) -> + JWT = do_generate_jwt(State0), + store_jwt(State0, JWT), + ?tp(rule_engine_jwt_worker_refresh, #{jwt => JWT}), + State1 = State0#{jwt := JWT}, + ensure_timer(State1). + +-spec store_jwt(state(), jwt()) -> ok. +store_jwt(#{resource_id := ResourceId, table := TId}, JWT) -> + true = ets:insert(TId, {{ResourceId, jwt}, JWT}), + ?tp(rule_engine_jwt_worker_token_stored, #{resource_id => ResourceId}), + ok. + +-spec ensure_timer(state()) -> state(). +ensure_timer(State = #{ refresh_timer := undefined + , expiration := ExpirationMS0 + }) -> + ExpirationMS = max(5_000, ExpirationMS0 - 5_000), + TRef = erlang:start_timer(ExpirationMS, self(), ?refresh_jwt), + State#{refresh_timer => TRef}; +ensure_timer(State) -> + State. + +-spec censor_secrets(state()) -> map(). +censor_secrets(State) -> + maps:map( + fun(Key, _Value) when Key =:= jwt; + Key =:= jwk -> + "******"; + (_Key, Value) -> + Value + end, + State). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl index 03e3fd11a..087dbcbfb 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl @@ -22,7 +22,9 @@ -export([start_link/0]). --export([start_locker/0]). +-export([ start_locker/0 + , start_jwt_sup/0 + ]). -export([init/1]). @@ -31,8 +33,12 @@ start_link() -> init([]) -> Opts = [public, named_table, set, {read_concurrency, true}], - _ = ets:new(?ACTION_INST_PARAMS_TAB, [{keypos, #action_instance_params.id}|Opts]), - _ = ets:new(?RES_PARAMS_TAB, [{keypos, #resource_params.id}|Opts]), + ensure_table(?ACTION_INST_PARAMS_TAB, [{keypos, #action_instance_params.id}|Opts]), + ensure_table(?RES_PARAMS_TAB, [{keypos, #resource_params.id}|Opts]), + SupFlags = #{ strategy => one_for_one + , intensity => 10 + , period => 10 + }, Registry = #{id => emqx_rule_registry, start => {emqx_rule_registry, start_link, []}, restart => permanent, @@ -51,7 +57,8 @@ init([]) -> shutdown => 5000, type => worker, modules => [emqx_rule_monitor]}, - {ok, {{one_for_one, 10, 10}, [Registry, Metrics, Monitor]}}. + JWTSup = jwt_sup_child_spec(), + {ok, {SupFlags, [Registry, Metrics, Monitor, JWTSup]}}. start_locker() -> Locker = #{id => emqx_rule_locker, @@ -61,3 +68,32 @@ start_locker() -> type => worker, modules => [emqx_rule_locker]}, supervisor:start_child(?MODULE, Locker). + +start_jwt_sup() -> + JWTSup = jwt_sup_child_spec(), + supervisor:start_child(?MODULE, JWTSup). + +jwt_sup_child_spec() -> + #{ id => emqx_rule_engine_jwt_sup + , start => {emqx_rule_engine_jwt_sup, start_link, []} + , type => supervisor + , restart => permanent + , shutdown => 5_000 + , modules => [emqx_rule_engine_jwt_sup] + }. + +ensure_table(Name, Opts) -> + try + case ets:whereis(name) of + undefined -> + _ = ets:new(Name, Opts), + ok; + _ -> + ok + end + catch + %% stil the table exists (somehow can happen in hot-upgrade, + %% it seems). + error:badarg -> + ok + end. diff --git a/apps/emqx_rule_engine/src/emqx_rule_metrics.erl b/apps/emqx_rule_engine/src/emqx_rule_metrics.erl index 2cebf96e5..4d8de9cff 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_metrics.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_metrics.erl @@ -19,6 +19,7 @@ -behaviour(gen_server). -include("rule_engine.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). %% API functions -export([ start_link/0 @@ -222,7 +223,9 @@ inc(Id, Metric, Val) -> counters:add(couters_ref(Id), metrics_idx(Metric), Val); Ref -> counters:add(Ref, metrics_idx(Metric), Val) - end. + end, + ?tp(rule_metrics_inc, #{id => Id, metric => Metric, value => Val}), + ok. inc_actions_taken(Id) -> inc_actions_taken(Id, 1). diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_jwt_worker_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_jwt_worker_SUITE.erl new file mode 100644 index 000000000..fc84293e3 --- /dev/null +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_jwt_worker_SUITE.erl @@ -0,0 +1,248 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_rule_engine_jwt_worker_SUITE). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx_rule_engine/include/rule_engine.hrl"). +-include_lib("jose/include/jose_jwt.hrl"). +-include_lib("jose/include/jose_jws.hrl"). + +-compile([export_all, nowarn_export_all]). + +%%----------------------------------------------------------------------------- +%% CT boilerplate +%%----------------------------------------------------------------------------- + +all() -> + emqx_ct:all(?MODULE). + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + ok. + +%%----------------------------------------------------------------------------- +%% Helper fns +%%----------------------------------------------------------------------------- + +generate_private_key_pem() -> + PublicExponent = 65537, + Size = 2048, + Key = public_key:generate_key({rsa, Size, PublicExponent}), + DERKey = public_key:der_encode('PrivateKeyInfo', Key), + public_key:pem_encode([{'PrivateKeyInfo', DERKey, not_encrypted}]). + +generate_config() -> + PrivateKeyPEM = generate_private_key_pem(), + ResourceID = emqx_guid:gen(), + #{ private_key => PrivateKeyPEM + , expiration => timer:hours(1) + , resource_id => ResourceID + , table => ets:new(test_jwt_table, [ordered_set, public]) + , iss => <<"issuer">> + , sub => <<"subject">> + , aud => <<"audience">> + , kid => <<"key id">> + , alg => <<"RS256">> + }. + +is_expired(JWT) -> + #jose_jwt{fields = #{<<"exp">> := Exp}} = jose_jwt:peek(JWT), + Now = erlang:system_time(seconds), + Now >= Exp. + +%%----------------------------------------------------------------------------- +%% Test cases +%%----------------------------------------------------------------------------- + +t_create_success(_Config) -> + Config = generate_config(), + Res = emqx_rule_engine_jwt_worker:start_link(Config), + ?assertMatch({ok, _}, Res), + {ok, Worker} = Res, + Ref = emqx_rule_engine_jwt_worker:ensure_jwt(Worker), + receive + {Ref, token_created} -> + ok + after + 1_000 -> + ct:fail("should have confirmed token creation; msgs: ~0p", + [process_info(self(), messages)]) + end, + ok. + +t_empty_key(_Config) -> + Config0 = generate_config(), + Config = Config0#{private_key := <<>>}, + process_flag(trap_exit, true), + ?check_trace( + ?wait_async_action( + ?assertMatch({ok, _}, emqx_rule_engine_jwt_worker:start_link(Config)), + #{?snk_kind := rule_engine_jwt_worker_startup_error}, + 1_000), + fun(Trace) -> + ?assertMatch([#{error := empty_key}], + ?of_kind(rule_engine_jwt_worker_startup_error, Trace)), + ok + end), + ok. + +t_invalid_pem(_Config) -> + Config0 = generate_config(), + InvalidPEM = public_key:pem_encode([{'PrivateKeyInfo', <<"xxxxxx">>, not_encrypted}, + {'PrivateKeyInfo', <<"xxxxxx">>, not_encrypted}]), + Config = Config0#{private_key := InvalidPEM}, + process_flag(trap_exit, true), + ?check_trace( + ?wait_async_action( + ?assertMatch({ok, _}, emqx_rule_engine_jwt_worker:start_link(Config)), + #{?snk_kind := rule_engine_jwt_worker_startup_error}, + 1_000), + fun(Trace) -> + ?assertMatch([#{error := {invalid_private_key, _}}], + ?of_kind(rule_engine_jwt_worker_startup_error, Trace)), + ok + end), + ok. + +t_refresh(_Config) -> + Config0 = #{ table := Table + , resource_id := ResourceId + } = generate_config(), + Config = Config0#{expiration => 5_000}, + ?check_trace( + begin + {{ok, _Pid}, {ok, _Event}} = + ?wait_async_action( + emqx_rule_engine_jwt_worker:start_link(Config), + #{?snk_kind := rule_engine_jwt_worker_token_stored}, + 5_000), + {ok, FirstJWT} = emqx_rule_engine_jwt:lookup_jwt(Table, ResourceId), + ?block_until(#{?snk_kind := rule_engine_jwt_worker_refresh, + jwt := JWT0} when JWT0 =/= FirstJWT, 15_000), + {ok, SecondJWT} = emqx_rule_engine_jwt:lookup_jwt(Table, ResourceId), + ?assertNot(is_expired(SecondJWT)), + ?assert(is_expired(FirstJWT)), + {FirstJWT, SecondJWT} + end, + fun({FirstJWT, SecondJWT}, Trace) -> + ?assertMatch([_, _ | _], + ?of_kind(rule_engine_jwt_worker_token_stored, Trace)), + ?assertNotEqual(FirstJWT, SecondJWT), + ok + end), + ok. + +t_format_status(_Config) -> + Config = generate_config(), + {ok, Pid} = emqx_rule_engine_jwt_worker:start_link(Config), + {status, _, _, Props} = sys:get_status(Pid), + [State] = [State + || Info = [_ | _] <- Props, + {data, Data = [_ | _]} <- Info, + {"State", State} <- Data], + ?assertMatch( + #{ jwt := "******" + , jwk := "******" + }, + State), + ok. + +t_lookup_ok(_Config) -> + Config = #{ table := Table + , resource_id := ResourceId + , private_key := PrivateKeyPEM + , aud := Aud + , iss := Iss + , sub := Sub + , kid := KId + } = generate_config(), + {ok, Worker} = emqx_rule_engine_jwt_worker:start_link(Config), + Ref = emqx_rule_engine_jwt_worker:ensure_jwt(Worker), + receive + {Ref, token_created} -> + ok + after + 500 -> + error(timeout) + end, + Res = emqx_rule_engine_jwt:lookup_jwt(Table, ResourceId), + ?assertMatch({ok, _}, Res), + {ok, JWT} = Res, + ?assert(is_binary(JWT)), + JWK = jose_jwk:from_pem(PrivateKeyPEM), + {IsValid, ParsedJWT, JWS} = jose_jwt:verify_strict(JWK, [<<"RS256">>], JWT), + ?assertMatch( + #jose_jwt{ + fields = #{ <<"aud">> := Aud + , <<"iss">> := Iss + , <<"sub">> := Sub + , <<"exp">> := _ + , <<"iat">> := _ + }}, + ParsedJWT), + ?assertNot(is_expired(JWT)), + ?assertMatch( + #jose_jws{ + alg = {_, 'RS256'}, + fields = #{ <<"kid">> := KId + , <<"typ">> := <<"JWT">> + }}, + JWS), + ?assert(IsValid), + ok. + +t_lookup_not_found(_Config) -> + Table = ets:new(test_jwt_table, [ordered_set, public]), + InexistentResource = <<"xxx">>, + ?assertEqual({error, not_found}, + emqx_rule_engine_jwt:lookup_jwt(Table, InexistentResource)), + ok. + +t_lookup_badarg(_Config) -> + InexistentTable = i_dont_exist, + InexistentResource = <<"xxx">>, + ?assertEqual({error, not_found}, + emqx_rule_engine_jwt:lookup_jwt(InexistentTable, InexistentResource)), + ok. + +t_start_supervised_worker(_Config) -> + {ok, _} = emqx_rule_engine_jwt_sup:start_link(), + Config = #{resource_id := ResourceId} = generate_config(), + {ok, Pid} = emqx_rule_engine_jwt_sup:ensure_worker_present(ResourceId, Config), + Ref = emqx_rule_engine_jwt_worker:ensure_jwt(Pid), + receive + {Ref, token_created} -> + ok + after + 5_000 -> + ct:fail("timeout") + end, + MRef = monitor(process, Pid), + ?assert(is_process_alive(Pid)), + ok = emqx_rule_engine_jwt_sup:ensure_worker_deleted(ResourceId), + receive + {'DOWN', MRef, process, Pid, _} -> + ok + after + 1_000 -> + ct:fail("timeout") + end, + ok. diff --git a/apps/emqx_rule_engine/test/emqx_rule_utils_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_utils_SUITE.erl index 95445bf84..ae3f72254 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_utils_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_utils_SUITE.erl @@ -136,16 +136,22 @@ t_preproc_sql5(_) -> emqx_rule_utils:proc_cql_param_str(ParamsTokens, Selected)). t_if_contains_placeholder(_) -> - ?assert(emqx_rule_utils:if_contains_placeholder(<<"${a}">>)), - ?assert(emqx_rule_utils:if_contains_placeholder(<<"${a}${b}">>)), - ?assert(emqx_rule_utils:if_contains_placeholder(<<"${a},${b},${c}">>)), - ?assert(emqx_rule_utils:if_contains_placeholder(<<"a:${a}">>)), - ?assert(emqx_rule_utils:if_contains_placeholder(<<"a:${a},b:${b}">>)), - ?assert(emqx_rule_utils:if_contains_placeholder(<<"abc${ab}">>)), - ?assertNot(emqx_rule_utils:if_contains_placeholder(<<"a">>)), - ?assertNot(emqx_rule_utils:if_contains_placeholder(<<"abc$">>)), - ?assertNot(emqx_rule_utils:if_contains_placeholder(<<"abc${">>)), - ?assertNot(emqx_rule_utils:if_contains_placeholder(<<"abc${a">>)), - ?assertNot(emqx_rule_utils:if_contains_placeholder(<<"abc${ab">>)), - ?assertNot(emqx_rule_utils:if_contains_placeholder(<<"a${ab${c${e">>)), - ok. + TestTab = + [ {true, "${a}"} + , {true, "${a}${b}"} + , {true, "${a},${b},${c}"} + , {true, "a:${a}"} + , {true, "a:${a},b:${b}"} + , {true, "abc${ab}"} + , {true, "a${ab${c}${e"} + , {false, "a"} + , {false, "abc$"} + , {false, "abc${"} + , {false, "abc${a"} + , {false, "abc${ab"} + , {false, "a${ab${c${e"} + ], + lists:foreach(fun({Expected, InputStr}) -> + ?assert(Expected =:= emqx_rule_utils:if_contains_placeholder(InputStr)), + ?assert(Expected =:= emqx_rule_utils:if_contains_placeholder(iolist_to_binary(InputStr))) + end, TestTab). diff --git a/apps/emqx_rule_engine/test/emqx_rulesql_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rulesql_SUITE.erl index 7e49f0cb9..77554abf8 100644 --- a/apps/emqx_rule_engine/test/emqx_rulesql_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rulesql_SUITE.erl @@ -56,7 +56,10 @@ groups() -> {rulesql_select_events, [], [ t_sqlparse_event_client_connected_01 , t_sqlparse_event_client_connected_02 - , t_sqlparse_event_client_disconnected + , t_sqlparse_event_client_disconnected_normal + , t_sqlparse_event_client_disconnected_kicked + , t_sqlparse_event_client_disconnected_discarded + , t_sqlparse_event_client_disconnected_takeovered , t_sqlparse_event_session_subscribed , t_sqlparse_event_session_unsubscribed , t_sqlparse_event_message_delivered @@ -145,6 +148,12 @@ end_per_group(_Groupname, _Config) -> %% Testcase specific setup/teardown %%------------------------------------------------------------------------------ +init_per_testcase(t_sqlparse_event_client_disconnected_discarded, Config) -> + application:set_env(emqx, client_disconnect_discarded, true), + Config; +init_per_testcase(t_sqlparse_event_client_disconnected_takeovered, Config) -> + application:set_env(emqx, client_disconnect_takeovered, true), + Config; init_per_testcase(_TestCase, Config) -> init_events_counters(), ok = emqx_rule_registry:register_resource_types( @@ -152,6 +161,12 @@ init_per_testcase(_TestCase, Config) -> %ct:pal("============ ~p", [ets:tab2list(emqx_resource_type)]), Config. +end_per_testcase(t_sqlparse_event_client_disconnected_takeovered, Config) -> + application:set_env(emqx, client_disconnect_takeovered, false), %% back to default + Config; +end_per_testcase(t_sqlparse_event_client_disconnected_discarded, Config) -> + application:set_env(emqx, client_disconnect_discarded, false), %% back to default + Config; end_per_testcase(_TestCase, _Config) -> ok. @@ -523,9 +538,118 @@ t_sqlparse_event_client_connected_02(_Config) -> emqx_rule_registry:remove_rule(TopicRule). %% FROM $events/client_disconnected -t_sqlparse_event_client_disconnected(_Config) -> - %% TODO - ok. +t_sqlparse_event_client_disconnected_normal(_Config) -> + ok = emqx_rule_engine:load_providers(), + Sql = "select * " + "from \"$events/client_disconnected\" ", + RepubT = <<"repub/to/disconnected/normal">>, + + TopicRule = create_simple_repub_rule(RepubT, Sql, <<>>), + + {ok, Client} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]), + {ok, _} = emqtt:connect(Client), + {ok, _, _} = emqtt:subscribe(Client, RepubT, 0), + ct:sleep(200), + {ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]), + {ok, _} = emqtt:connect(Client1), + emqtt:disconnect(Client1), + + receive {publish, #{topic := T, payload := Payload}} -> + ?assertEqual(RepubT, T), + ?assertMatch(#{<<"reason">> := <<"normal">>}, emqx_json:decode(Payload, [return_maps])) + after 1000 -> + ct:fail(wait_for_repub_disconnected_normal) + end, + emqtt:stop(Client), + + emqx_rule_registry:remove_rule(TopicRule). + +t_sqlparse_event_client_disconnected_kicked(_Config) -> + ok = emqx_rule_engine:load_providers(), + Sql = "select * " + "from \"$events/client_disconnected\" ", + RepubT = <<"repub/to/disconnected/kicked">>, + + TopicRule = create_simple_repub_rule(RepubT, Sql, <<>>), + + {ok, ClientRecvRepub} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]), + {ok, _} = emqtt:connect(ClientRecvRepub), + {ok, _, _} = emqtt:subscribe(ClientRecvRepub, RepubT, 0), + ct:sleep(200), + + {ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]), + {ok, _} = emqtt:connect(Client1), + emqx_cm:kick_session(<<"emqx">>), + unlink(Client1), %% the process will receive {'EXIT',{shutdown,tcp_closed}} + + receive {publish, #{topic := T, payload := Payload}} -> + ?assertEqual(RepubT, T), + ?assertMatch(#{<<"reason">> := <<"kicked">>}, emqx_json:decode(Payload, [return_maps])) + after 1000 -> + ct:fail(wait_for_repub_disconnected_kicked) + end, + emqtt:stop(ClientRecvRepub), + emqx_rule_registry:remove_rule(TopicRule). + +t_sqlparse_event_client_disconnected_discarded(_Config) -> + ok = emqx_rule_engine:load_providers(), + Sql = "select * " + "from \"$events/client_disconnected\" ", + RepubT = <<"repub/to/disconnected/discarded">>, + + TopicRule = create_simple_repub_rule(RepubT, Sql, <<>>), + + {ok, ClientRecvRepub} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]), + {ok, _} = emqtt:connect(ClientRecvRepub), + {ok, _, _} = emqtt:subscribe(ClientRecvRepub, RepubT, 0), + ct:sleep(200), + + {ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]), + {ok, _} = emqtt:connect(Client1), + unlink(Client1), %% the process will receive {'EXIT',{shutdown,tcp_closed}} + + {ok, Client2} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}, {clean_start, true}]), + {ok, _} = emqtt:connect(Client2), + + receive {publish, #{topic := T, payload := Payload}} -> + ?assertEqual(RepubT, T), + ?assertMatch(#{<<"reason">> := <<"discarded">>}, emqx_json:decode(Payload, [return_maps])) + after 1000 -> + ct:fail(wait_for_repub_disconnected_discarded) + end, + + emqtt:stop(ClientRecvRepub), emqtt:stop(Client2), + emqx_rule_registry:remove_rule(TopicRule). + +t_sqlparse_event_client_disconnected_takeovered(_Config) -> + ok = emqx_rule_engine:load_providers(), + Sql = "select * " + "from \"$events/client_disconnected\" ", + RepubT = <<"repub/to/disconnected/takeovered">>, + + TopicRule = create_simple_repub_rule(RepubT, Sql, <<>>), + + {ok, ClientRecvRepub} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]), + {ok, _} = emqtt:connect(ClientRecvRepub), + {ok, _, _} = emqtt:subscribe(ClientRecvRepub, RepubT, 0), + ct:sleep(200), + + {ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]), + {ok, _} = emqtt:connect(Client1), + unlink(Client1), %% the process will receive {'EXIT',{shutdown,tcp_closed}} + + {ok, Client2} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}, {clean_start, false}]), + {ok, _} = emqtt:connect(Client2), + + receive {publish, #{topic := T, payload := Payload}} -> + ?assertEqual(RepubT, T), + ?assertMatch(#{<<"reason">> := <<"takeovered">>}, emqx_json:decode(Payload, [return_maps])) + after 1000 -> + ct:fail(wait_for_repub_disconnected_discarded) + end, + + emqtt:stop(ClientRecvRepub), emqtt:stop(Client2), + emqx_rule_registry:remove_rule(TopicRule). %% FROM $events/session_subscribed t_sqlparse_event_session_subscribed(_Config) -> diff --git a/build b/build index eaae30b83..6206d9f89 100755 --- a/build +++ b/build @@ -67,7 +67,14 @@ make_rel() { } relup_db() { - ./scripts/relup-base-vsns.escript "$@" ./data/relup-paths.eterm + case "$PROFILE" in + *-ee*) + ./scripts/relup-base-vsns.escript "$@" ./data/relup-paths-ee.eterm + ;; + *) + ./scripts/relup-base-vsns.escript "$@" ./data/relup-paths.eterm + ;; + esac } ## unzip previous version .zip files to _build/$PROFILE/rel/emqx/releases before making relup @@ -84,7 +91,6 @@ make_relup() { if [[ "$OTP_BASE" != "$OTP_VSN" ]]; then OTP_CHANGED='yes' fi - OTP_BASE=$(relup_db otp-vsn-for "$PKG_VSN") zip_file="_upgrade_base/${PROFILE}-$(env OTP_VSN="$OTP_BASE" PKG_VSN="$BASE_VSN" ./scripts/pkg-full-vsn.sh 'vsn_exact').zip" if [ ! -d "$releases_dir/$BASE_VSN" ]; then local tmp_dir diff --git a/changes/v4.3.22-en.md b/changes/v4.3.22-en.md index c902fbf61..0004ce400 100644 --- a/changes/v4.3.22-en.md +++ b/changes/v4.3.22-en.md @@ -36,9 +36,16 @@ For example: `acl_order = jwt,http`, this will make sure `jwt` is always checked before `http`, meaning if JWT is not found (or no `acl` cliam) for a client, then the ACL check will fallback to use the HTTP backend. + +- Added configurations to enable more `client.disconnected` events (and counter bumps) [#9267](https://github.com/emqx/emqx/pull/9267). + Prior to this change, the `client.disconnected` event (and counter bump) is triggered when a client + performs a 'normal' disconnect, or is 'kicked' by system admin, but NOT triggered when a + stale connection had to be 'discarded' (for clean session) or 'takenover' (for non-clean session). + Now it is possible to set configs `broker.client_disconnect_discarded` and `broker.client_disconnect_takenover` to `on` to enable the event in these scenarios. + ## Bug fixes -- Fix that after uploading a backup file with an UTF8 filename, HTTP API `GET /data/export` fails with status code 500 [#9224](https://github.com/emqx/emqx/pull/9224). +- Fix that after uploading a backup file with an non-ASCII filename, HTTP API `GET /data/export` fails with status code 500 [#9224](https://github.com/emqx/emqx/pull/9224). - Improve the display of rule's 'Maximum Speed' counter to only reserve 2 decimal places [#9185](https://github.com/emqx/emqx/pull/9185). This is to avoid displaying floats like `0.30000000000000004` on the dashboard. @@ -62,3 +69,5 @@ - Make sure Rule-Engine API supports Percent-encoding `rule_id` and `resource_id` in HTTP request path [#9190](https://github.com/emqx/emqx/pull/9190). Note that the `id` in `POST /api/v4/rules` should be literals (not encoded) when creating a `rule` or `resource`. See docs [Create Rule](https://www.emqx.io/docs/zh/v4.3/advanced/http-api.html#post-api-v4-rules) [Create Resource](https://www.emqx.io/docs/zh/v4.3/advanced/http-api.html#post-api-v4-resources). + +- When republishing messages or bridge messages to other brokers, check the validity of the topic and make sure it does not have topic wildcards [#9291](https://github.com/emqx/emqx/pull/9291). diff --git a/changes/v4.3.22-zh.md b/changes/v4.3.22-zh.md index b969dd4c2..cb1bf8e67 100644 --- a/changes/v4.3.22-zh.md +++ b/changes/v4.3.22-zh.md @@ -32,9 +32,14 @@ 例如,`acl_order = jwt,http`,可以用于保证 `jwt` 这个模块总是排在 `http` 的前面, 也就是说,在对客户端进行 ACL 检查时,如果 JWT 不存在(或者没有定义 ACL),那么回退到使用 HTTP。 +- 为更多类型的 `client.disconnected` 事件(计数器触发)提供可配置项 [#9267](https://github.com/emqx/emqx/pull/9267)。 + 此前,`client.disconnected` 事件及计数器仅会在客户端正常断开连接或客户端被系统管理员踢出时触发, + 但不会在旧 session 被废弃 (clean_session = true) 或旧 session 被接管 (clean_session = false) 时被触发。 + 可将 `broker.client_disconnect_discarded` 和 `broker.client_disconnect_takovered` 选项设置为 `on` 来启用此场景下的客户端断连事件。 + ## 修复 -- 修复若上传的备份文件名中包含 UTF8 字符,`GET /data/export` HTTP 接口返回 500 错误 [#9224](https://github.com/emqx/emqx/pull/9224)。 +- 修复若上传的备份文件名中包含非 ASCII 字符,`GET /data/export` HTTP 接口返回 500 错误 [#9224](https://github.com/emqx/emqx/pull/9224)。 - 改进规则的 "最大执行速度" 的计数,只保留小数点之后 2 位 [#9185](https://github.com/emqx/emqx/pull/9185)。 避免在 dashboard 上展示类似这样的浮点数:`0.30000000000000004`。 @@ -58,3 +63,5 @@ - 使规则引擎 API 在 HTTP 请求路径中支持百分号编码的 `rule_id` 及 `resource_id` [#9190](https://github.com/emqx/emqx/pull/9190)。 注意在创建规则或资源时,HTTP body 中的 `id` 字段仍为字面值,而不是编码之后的值。 详情请参考 [创建规则](https://www.emqx.io/docs/zh/v4.3/advanced/http-api.html#post-api-v4-rules) 和 [创建资源](https://www.emqx.io/docs/zh/v4.3/advanced/http-api.html#post-api-v4-resources)。 + +- 在进行消息重发布或桥接消息到其他 mqtt broker 时,检查 topic 合法性,确定其不带有主题通配符 [#9291](https://github.com/emqx/emqx/pull/9291)。 diff --git a/changes/v4.4.11-en.md b/changes/v4.4.11-en.md index c04b0cc2e..68bf692ed 100644 --- a/changes/v4.4.11-en.md +++ b/changes/v4.4.11-en.md @@ -18,6 +18,10 @@ - Added support for specifying custom modules for custom authentication [#9297](https://github.com/emqx/emqx/pull/9297). To support simple authentication rules, it is no longer necessary to implement a full-blown plugin. +- Added a JWT management for Rule-Engin, for creating and refreshing JWT tokens in rule engine actions [#9241](https://github.com/emqx/emqx/pull/9241). + This feature is so far only used in EMQX Enterprise Google PubSub integration. + Can be used as webhook integration's JWT authenticationa against the webhook service endpoint. + ### Bug fixes - Fix get trace list crash when trace not initialize. [#9156](https://github.com/emqx/emqx/pull/9156) diff --git a/changes/v4.4.11-zh.md b/changes/v4.4.11-zh.md index a7d0a9f39..d489a68a1 100644 --- a/changes/v4.4.11-zh.md +++ b/changes/v4.4.11-zh.md @@ -15,6 +15,10 @@ - 增加了可定制的认证回调模块 [#9297](https://github.com/emqx/emqx/pull/9297)。 对于一些简单的认证检查,不需要去实现一个完整的认证插件。 +- 为规则引擎增加了一个 JWT 令牌管理,用于在规则引擎动作中创建和刷新 JWT 令牌 [#9241](https://github.com/emqx/emqx/pull/9241)。 + 该功能现在仅用于 EMQX 企业版的 Google PubSub 集成中。 + 后续会用于 webhook 集成的 JWT 认证。 + ### 修复 - 修复日志追踪模块没开启时,GET Trace 列表接口报错的问题。[#9156](https://github.com/emqx/emqx/pull/9156) diff --git a/etc/emqx.conf b/etc/emqx.conf index 0ec15be31..143f93a8f 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -2504,6 +2504,16 @@ broker.route_batch_clean = off ## - false: disable trie path compaction # broker.perf.trie_compaction = true +## Enable client disconnect event will be triggered by which reasons. +## Value: on | off +## `takeover`: session was takenover by another client with same client ID. (clean_session = false) +## Default: off +## `discard`: session was takeover by another client with same client ID. (clean_session = true) +## Default: off +## +# broker.client_disconnect_discarded = off +# broker.client_disconnect_takeovered = off + ## CONFIG_SECTION_BGN=sys_mon ================================================== ## Enable Long GC monitoring. Disable if the value is 0. diff --git a/lib-ce/emqx_modules/test/emqx_mod_sup_SUITE.erl b/lib-ce/emqx_modules/test/emqx_mod_sup_SUITE.erl index d2453bb2e..72b8e8602 100644 --- a/lib-ce/emqx_modules/test/emqx_mod_sup_SUITE.erl +++ b/lib-ce/emqx_modules/test/emqx_mod_sup_SUITE.erl @@ -23,6 +23,18 @@ all() -> emqx_ct:all(?MODULE). +init_per_suite(Config) -> + %% do not start the application + %% only testing the root supervisor in this suite + application:stop(emqx_modules), + {ok, Pid} = emqx_mod_sup:start_link(), + unlink(Pid), + Config. + +end_per_suite(_Config) -> + exit(whereis(emqx_mod_sup), kill), + ok. + %%-------------------------------------------------------------------- %% Test cases %%-------------------------------------------------------------------- diff --git a/priv/emqx.schema b/priv/emqx.schema index c893d0521..361374ee5 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -2669,6 +2669,20 @@ end}. {datatype, {enum, [true, false]}} ]}. +%% @doc Configuration of disconnected event reason. +%% `takeover`: session was takenover by another client with same client ID. (clean_session = false) +%% `discard`: session was takeover by another client with same client ID. (clean_session = true) +{mapping, "broker.client_disconnect_discarded", "emqx.client_disconnect_discarded", [ + {default, off}, + {datatype, flag} +]}. + +{mapping, "broker.client_disconnect_takeovered", "emqx.client_disconnect_takeovered", [ + {default, off}, + {datatype, flag} +]}. + + %%-------------------------------------------------------------------- %% System Monitor %%-------------------------------------------------------------------- diff --git a/scripts/find-apps.sh b/scripts/find-apps.sh index c4a0eec62..9f188cf34 100755 --- a/scripts/find-apps.sh +++ b/scripts/find-apps.sh @@ -5,6 +5,29 @@ set -euo pipefail # ensure dir cd -P -- "$(dirname -- "$0")/.." +help() { + echo + echo "-h|--help: To display this usage info" + echo "--json: Print apps in json" +} + +WANT_JSON='no' +while [ "$#" -gt 0 ]; do + case $1 in + -h|--help) + help + exit 0 + ;; + --json) + WANT_JSON='yes' + shift 1 + ;; + *) + echo "unknown option $1" + exit 1 + ;; + esac +done if [ "$(./scripts/get-distro.sh)" = 'windows' ]; then # Otherwise windows may resolve to find.exe FIND="/usr/bin/find" @@ -17,17 +40,26 @@ find_app() { "$FIND" "${appdir}" -mindepth 1 -maxdepth 1 -type d } -# append emqx application first -echo 'emqx' +EM="emqx" +CE="$(find_app 'apps')" -find_app 'apps' if [ -f 'EMQX_ENTERPRISE' ]; then - find_app 'lib-ee' + LIB="$(find_app 'lib-ee')" else - find_app 'lib-ce' + LIB="$(find_app 'lib-ce')" fi ## find directories in lib-extra -find_app 'lib-extra' +LIBE="$(find_app 'lib-extra')" + ## find symlinks in lib-extra -"$FIND" 'lib-extra' -mindepth 1 -maxdepth 1 -type l -exec test -e {} \; -print +LIBES="$("$FIND" 'lib-extra' -mindepth 1 -maxdepth 1 -type l -exec test -e {} \; -print)" + +APPS_ALL="$(echo -e "${EM}\n${CE}\n${LIB}\n${LIBE}\n${LIBES}")" + +if [ "$WANT_JSON" = 'yes' ]; then + echo "${APPS_ALL}" | xargs | tr -d '\n' | jq -R -s -c 'split(" ")' +else + echo "${APPS_ALL}" +fi + diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 60301abd5..1d726fa89 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -998,7 +998,13 @@ handle_call(discard, Channel) -> disconnect_and_shutdown(discarded, ok, Channel); %% Session Takeover -handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) -> +handle_call({takeover, 'begin'}, Channel = #channel{ + session = Session, + conninfo = #{clientid := ClientId} + }) -> + ?tp(debug, + emqx_channel_takeover_begin, + #{clientid => ClientId}), reply(Session, Channel#channel{takeover = true}); handle_call({takeover, 'end'}, Channel = #channel{session = Session, @@ -1748,7 +1754,16 @@ parse_topic_filters(TopicFilters) -> lists:map(fun emqx_topic:parse/1, TopicFilters). %%-------------------------------------------------------------------- -%% Ensure disconnected +%% Maybe & Ensure disconnected + +ensure_disconnected(connected, Reason, Channel) + when Reason =:= discarded orelse Reason =:= takeovered -> + case is_disconnect_event_enabled(Reason) of + true -> ensure_disconnected(Reason, Channel); + false -> Channel + end; +ensure_disconnected(_, _, Channel) -> + Channel. ensure_disconnected(Reason, Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) -> @@ -1845,12 +1860,15 @@ shutdown(Reason, Reply, Channel) -> shutdown(Reason, Reply, Packet, Channel) -> {shutdown, Reason, Reply, Packet, Channel}. +%% mqtt v5 connected sessions disconnect_and_shutdown(Reason, Reply, Channel = ?IS_MQTT_V5 = #channel{conn_state = connected}) -> - shutdown(Reason, Reply, ?DISCONNECT_PACKET(reason_code(Reason)), Channel); - -disconnect_and_shutdown(Reason, Reply, Channel) -> - shutdown(Reason, Reply, Channel). + NChannel = ensure_disconnected(connected, Reason, Channel), + shutdown(Reason, Reply, ?DISCONNECT_PACKET(reason_code(Reason)), NChannel); +%% mqtt v3/v4 sessions, mqtt v5 other conn_state sessions +disconnect_and_shutdown(Reason, Reply, Channel= #channel{conn_state = ConnState}) -> + NChannel = ensure_disconnected(ConnState, Reason, Channel), + shutdown(Reason, Reply, NChannel). sp(true) -> 1; sp(false) -> 0. @@ -1858,6 +1876,11 @@ sp(false) -> 0. flag(true) -> 1; flag(false) -> 0. +is_disconnect_event_enabled(discarded) -> + emqx:get_env(client_disconnect_discarded, false); +is_disconnect_event_enabled(takeovered) -> + emqx:get_env(client_disconnect_takeovered, false). + %%-------------------------------------------------------------------- %% For CT tests %%-------------------------------------------------------------------- diff --git a/test/emqx_channel_SUITE.erl b/test/emqx_channel_SUITE.erl index e887d35f4..0f0e7d33f 100644 --- a/test/emqx_channel_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -825,15 +825,18 @@ t_enrich_connack_caps(_) -> wildcard_subscription => true } end), - AckProps = emqx_channel:enrich_connack_caps(#{}, channel()), - ?assertMatch(#{'Retain-Available' := 1, - 'Maximum-Packet-Size' := 1024, - 'Topic-Alias-Maximum' := 10, - 'Wildcard-Subscription-Available' := 1, - 'Subscription-Identifier-Available' := 1, - 'Shared-Subscription-Available' := 1 - }, AckProps), - ok = meck:unload(emqx_mqtt_caps). + try + AckProps = emqx_channel:enrich_connack_caps(#{}, channel()), + ?assertMatch(#{'Retain-Available' := 1, + 'Maximum-Packet-Size' := 1024, + 'Topic-Alias-Maximum' := 10, + 'Wildcard-Subscription-Available' := 1, + 'Subscription-Identifier-Available' := 1, + 'Shared-Subscription-Available' := 1 + }, AckProps) + after + ok = meck:unload(emqx_mqtt_caps) + end. %%-------------------------------------------------------------------- %% Test cases for terminate diff --git a/test/emqx_mqtt_caps_SUITE.erl b/test/emqx_mqtt_caps_SUITE.erl index c2b1a477b..8afd6a3be 100644 --- a/test/emqx_mqtt_caps_SUITE.erl +++ b/test/emqx_mqtt_caps_SUITE.erl @@ -52,13 +52,15 @@ t_check_sub(_) -> wildcard_subscription => false }, emqx_zone:set_env(zone, '$mqtt_sub_caps', SubCaps), - timer:sleep(50), - ClientInfo = #{zone => zone}, - ok = emqx_mqtt_caps:check_sub(ClientInfo, <<"topic">>, SubOpts), - ?assertEqual({error, ?RC_TOPIC_FILTER_INVALID}, - emqx_mqtt_caps:check_sub(ClientInfo, <<"a/b/c/d">>, SubOpts)), - ?assertEqual({error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}, - emqx_mqtt_caps:check_sub(ClientInfo, <<"+/#">>, SubOpts)), - ?assertEqual({error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}, - emqx_mqtt_caps:check_sub(ClientInfo, <<"topic">>, SubOpts#{share => true})), - emqx_zone:unset_env(zone, '$mqtt_pub_caps'). + try + ClientInfo = #{zone => zone}, + ok = emqx_mqtt_caps:check_sub(ClientInfo, <<"topic">>, SubOpts), + ?assertEqual({error, ?RC_TOPIC_FILTER_INVALID}, + emqx_mqtt_caps:check_sub(ClientInfo, <<"a/b/c/d">>, SubOpts)), + ?assertEqual({error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}, + emqx_mqtt_caps:check_sub(ClientInfo, <<"+/#">>, SubOpts)), + ?assertEqual({error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}, + emqx_mqtt_caps:check_sub(ClientInfo, <<"topic">>, SubOpts#{share => true})) + after + emqx_zone:unset_env(zone, '$mqtt_pub_caps') + end. diff --git a/test/emqx_mqtt_protocol_v5_SUITE.erl b/test/emqx_mqtt_protocol_v5_SUITE.erl index 05424d642..33592b849 100644 --- a/test/emqx_mqtt_protocol_v5_SUITE.erl +++ b/test/emqx_mqtt_protocol_v5_SUITE.erl @@ -468,8 +468,8 @@ t_connack_max_qos_allowed(_) -> %% max_qos_allowed = 0 emqx_zone:set_env(external, max_qos_allowed, 0), - persistent_term:erase({emqx_zone, external, '$mqtt_caps'}), - persistent_term:erase({emqx_zone, external, '$mqtt_pub_caps'}), + emqx_zone:unset_env(external, '$mqtt_caps'), + emqx_zone:unset_env(external, '$mqtt_pub_caps'), {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), {ok, Connack1} = emqtt:connect(Client1), @@ -496,8 +496,8 @@ t_connack_max_qos_allowed(_) -> %% max_qos_allowed = 1 emqx_zone:set_env(external, max_qos_allowed, 1), - persistent_term:erase({emqx_zone, external, '$mqtt_caps'}), - persistent_term:erase({emqx_zone, external, '$mqtt_pub_caps'}), + emqx_zone:unset_env(external, '$mqtt_caps'), + emqx_zone:unset_env(external, '$mqtt_pub_caps'), {ok, Client3} = emqtt:start_link([{proto_ver, v5}]), {ok, Connack3} = emqtt:connect(Client3), @@ -524,8 +524,8 @@ t_connack_max_qos_allowed(_) -> %% max_qos_allowed = 2 emqx_zone:set_env(external, max_qos_allowed, 2), - persistent_term:erase({emqx_zone, external, '$mqtt_caps'}), - persistent_term:erase({emqx_zone, external, '$mqtt_pub_caps'}), + emqx_zone:unset_env(external, '$mqtt_caps'), + emqx_zone:unset_env(external, '$mqtt_pub_caps'), {ok, Client5} = emqtt:start_link([{proto_ver, v5}]), {ok, Connack5} = emqtt:connect(Client5), diff --git a/test/emqx_node_helpers.erl b/test/emqx_node_helpers.erl index af7093dd7..c2d16275e 100644 --- a/test/emqx_node_helpers.erl +++ b/test/emqx_node_helpers.erl @@ -31,12 +31,23 @@ start_slave(Name) -> start_slave(Name, #{}). start_slave(Name, Opts) -> + SlaveMod = maps:get(slave_mod, Opts, ct_slave), Node = make_node_name(Name), - case ct_slave:start(Node, [{kill_if_fail, true}, - {monitor_master, true}, - {init_timeout, 10000}, - {startup_timeout, 10000}, - {erl_flags, ebin_path()}]) of + DoStart = + fun() -> + case SlaveMod of + ct_slave -> + ct_slave:start(Node, + [{kill_if_fail, true}, + {monitor_master, true}, + {init_timeout, 10000}, + {startup_timeout, 10000}, + {erl_flags, ebin_path()}]); + slave -> + slave:start_link(host(), Name, ebin_path()) + end + end, + case DoStart() of {ok, _} -> ok; {error, started_not_connected, _} -> @@ -115,6 +126,9 @@ setup_node(Node, #{} = Opts) -> ?assertEqual( node() , gen_rpc:call(Node, gen_rpc, call, [node(), erlang, node, []]) ), + + ok = snabbkaffe:forward_trace(Node), + ok. %% Routes are replicated async. diff --git a/test/emqx_router_SUITE.erl b/test/emqx_router_SUITE.erl index 5e15f858e..f787d70c1 100644 --- a/test/emqx_router_SUITE.erl +++ b/test/emqx_router_SUITE.erl @@ -32,6 +32,7 @@ init_per_suite(Config) -> Config. end_per_suite(_Config) -> + emqx_ct_helpers:boot_modules(all), emqx_ct_helpers:stop_apps([]). init_per_testcase(_TestCase, Config) -> diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index 2112f0b8c..897a59cb6 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -51,7 +51,7 @@ init_per_suite(Config) -> PortDiscovery = application:get_env(gen_rpc, port_discovery), application:set_env(gen_rpc, port_discovery, stateless), application:ensure_all_started(gen_rpc), - %% ensure emqx_moduels' app modules are loaded + %% ensure emqx_modules app modules are loaded %% so the mnesia tables are created ok = load_app(emqx_modules), emqx_ct_helpers:start_apps([]), diff --git a/test/emqx_takeover_SUITE.erl b/test/emqx_takeover_SUITE.erl index 3317317c5..c7e3c9703 100644 --- a/test/emqx_takeover_SUITE.erl +++ b/test/emqx_takeover_SUITE.erl @@ -32,25 +32,44 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> + emqx_ct_helpers:boot_modules(all), emqx_ct_helpers:start_apps([]), Config. end_per_suite(_Config) -> emqx_ct_helpers:stop_apps([]), ok. + +init_per_testcase(Case, Config) -> + ?MODULE:Case({'init', Config}). + +end_per_testcase(Case, Config) -> + ?MODULE:Case({'end', Config}). + %%-------------------------------------------------------------------- %% Testcases -t_takeover(_) -> - process_flag(trap_exit, true), +t_takeover({init, Config}) when is_list(Config) -> + Config; +t_takeover({'end', Config}) when is_list(Config) -> + ok; +t_takeover(Config) when is_list(Config) -> AllMsgs = messages(?CNT), Pos = rand:uniform(?CNT), - - ClientId = <<"clientid">>, - {ok, C1} = emqtt:start_link([{clientid, ClientId}, {clean_start, false}]), - {ok, _} = emqtt:connect(C1), + ClientId = random_clientid(), + ClientOpts = [{clientid, ClientId}, + {clean_start, false}, + {host, "127.0.0.1"}, + {port, 1883} + ], + C1 = + with_retry( + fun() -> + {ok, C} = emqtt:start_link(ClientOpts), + {ok, _} = emqtt:connect(C), + C + end, 5), emqtt:subscribe(C1, <<"t">>, 1), - spawn(fun() -> [begin emqx:publish(lists:nth(I, AllMsgs)), @@ -59,31 +78,65 @@ t_takeover(_) -> end), emqtt:pause(C1), timer:sleep(?CNT*10), - load_meck(ClientId), - spawn(fun() -> - [begin - emqx:publish(lists:nth(I, AllMsgs)), - timer:sleep(rand:uniform(10)) - end || I <- lists:seq(Pos+1, ?CNT)] - end), - {ok, C2} = emqtt:start_link([{clientid, ClientId}, {clean_start, false}]), - {ok, _} = emqtt:connect(C2), - - Received = all_received_publishs(), - ct:pal("middle: ~p, received: ~p", [Pos, [P || {publish, #{payload := P}} <- Received]]), - assert_messages_missed(AllMsgs, Received), - assert_messages_order(AllMsgs, Received), - - emqtt:disconnect(C2), - unload_meck(ClientId). - -t_takover_in_cluster(_) -> - todo. + try + spawn(fun() -> + [begin + emqx:publish(lists:nth(I, AllMsgs)), + timer:sleep(rand:uniform(10)) + end || I <- lists:seq(Pos+1, ?CNT)] + end), + {ok, C2} = emqtt:start_link(ClientOpts), + %% C1 is going down, unlink it so the test can continue to run + _ = monitor(process, C1), + ?assert(erlang:is_process_alive(C1)), + unlink(C1), + {ok, _} = emqtt:connect(C2), + receive + {'DOWN', _, process, C1, _} -> + ok + after 1000 -> + ct:fail("timedout_waiting_for_old_connection_shutdown") + end, + Received = all_received_publishs(), + ct:pal("middle: ~p, received: ~p", [Pos, [P || {publish, #{payload := P}} <- Received]]), + assert_messages_missed(AllMsgs, Received), + assert_messages_order(AllMsgs, Received), + kill_process(C2, fun emqtt:stop/1) + after + unload_meck(ClientId) + end. %%-------------------------------------------------------------------- %% Helpers +random_clientid() -> + iolist_to_binary(["clientid", "-", integer_to_list(erlang:system_time())]). + +kill_process(Pid, WithFun) -> + _ = unlink(Pid), + _ = monitor(process, Pid), + try WithFun(Pid) + catch _:_ -> ok + end, + receive + {'DOWN', _, process, Pid, _} -> + ok + after 10_000 -> + exit(Pid, kill), + error(timeout) + end. + +with_retry(Fun, 1) -> Fun(); +with_retry(Fun, N) when N > 1 -> + try + Fun() + catch + _ : _ -> + ct:sleep(1000), + with_retry(Fun, N - 1) + end. + load_meck(ClientId) -> meck:new(fake_conn_mod, [non_strict]), HookTakeover = fun(Pid, Msg = {takeover, 'begin'}) ->