Merge remote-tracking branch 'origin/release-v44' into ocsp-rv44
This commit is contained in:
commit
51a959c3b4
|
@ -1,8 +1,4 @@
|
|||
name: 'Detect profiles'
|
||||
inputs:
|
||||
ci_git_token:
|
||||
required: true
|
||||
type: string
|
||||
outputs:
|
||||
profiles:
|
||||
description: 'Detected profiles'
|
||||
|
@ -14,13 +10,18 @@ runs:
|
|||
- id: detect-profiles
|
||||
shell: bash
|
||||
run: |
|
||||
git config --global --add safe.directory "$GITHUB_WORKSPACE"
|
||||
if make emqx-ee --dry-run > /dev/null 2>&1; then
|
||||
echo "::set-output name=profiles::[\"emqx-ee\"]"
|
||||
echo "https://ci%40emqx.io:${{ inputs.ci_git_token }}@github.com" > $HOME/.git-credentials
|
||||
git config --global credential.helper store
|
||||
if [ -d source ]; then
|
||||
## source code downloaded
|
||||
cd source
|
||||
fi
|
||||
if [ ! -d .git ]; then
|
||||
echo "Not git dir, $(pwd)"
|
||||
exit 1
|
||||
fi
|
||||
if [ -f 'EMQX_ENTERPRISE' ]; then
|
||||
echo "profiles=[\"emqx-ee\"]" >> $GITHUB_OUTPUT
|
||||
echo "EMQX_NAME=emqx-ee" >> $GITHUB_ENV
|
||||
else
|
||||
echo "::set-output name=profiles::[\"emqx\", \"emqx-edge\"]"
|
||||
echo "profiles=[\"emqx\", \"emqx-edge\"]" >> $GITHUB_OUTPUT
|
||||
echo "EMQX_NAME=emqx" >> $GITHUB_ENV
|
||||
fi
|
||||
|
|
|
@ -27,27 +27,23 @@ runs:
|
|||
shell: bash
|
||||
run: |
|
||||
brew update
|
||||
brew install curl zip unzip gnu-sed kerl coreutils unixodbc freetds openssl@1.1
|
||||
brew install curl zip unzip gnu-sed coreutils unixodbc freetds openssl@1.1
|
||||
echo "/usr/local/opt/bison/bin" >> $GITHUB_PATH
|
||||
echo "/usr/local/bin" >> $GITHUB_PATH
|
||||
- uses: actions/cache@v2
|
||||
- uses: actions/cache@v3
|
||||
id: cache
|
||||
with:
|
||||
path: ~/.kerl/${{ inputs.otp }}
|
||||
path: /opt/erlang/${{ inputs.otp }}
|
||||
key: otp-install-${{ inputs.otp }}-${{ inputs.os }}-static-ssl-disable-hipe-disable-jit
|
||||
restore-keys: |
|
||||
otp-install-${{ inputs.otp }}-${{ inputs.os }}
|
||||
- name: build erlang
|
||||
if: steps.cache.outputs.cache-hit != 'true'
|
||||
shell: bash
|
||||
env:
|
||||
KERL_BUILD_BACKEND: git
|
||||
OTP_GITHUB_URL: https://github.com/emqx/otp
|
||||
KERL_CONFIGURE_OPTIONS: --disable-dynamic-ssl-lib --with-ssl=/usr/local/opt/openssl@1.1 --disable-hipe --disable-jit
|
||||
run: |
|
||||
kerl update releases
|
||||
kerl build ${{ inputs.otp }}
|
||||
kerl install ${{ inputs.otp }} $HOME/.kerl/${{ inputs.otp }}
|
||||
git clone --depth 1 --branch OTP-${{ inputs.otp }} https://github.com/emqx/otp.git $HOME/otp-${{ inputs.otp }}
|
||||
cd $HOME/otp-${{ inputs.otp }}
|
||||
./configure --disable-dynamic-ssl-lib --with-ssl=/usr/local/opt/openssl@1.1 --disable-hipe --disable-jit --prefix=/opt/erlang/${{ inputs.otp }}
|
||||
make -j$(nproc)
|
||||
sudo make install
|
||||
- name: build
|
||||
env:
|
||||
AUTO_INSTALL_BUILD_DEPS: 1
|
||||
|
@ -60,7 +56,7 @@ runs:
|
|||
APPLE_DEVELOPER_ID_BUNDLE_PASSWORD: ${{ inputs.apple_developer_id_bundle_password }}
|
||||
shell: bash
|
||||
run: |
|
||||
. $HOME/.kerl/${{ inputs.otp }}/activate
|
||||
export PATH="/opt/erlang/${{ inputs.otp }}/bin:$PATH"
|
||||
make ensure-rebar3
|
||||
sudo cp rebar3 /usr/local/bin/rebar3
|
||||
make ${EMQX_NAME}-zip
|
||||
|
|
|
@ -26,14 +26,12 @@ jobs:
|
|||
profiles: ${{ steps.detect-profiles.outputs.profiles}}
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- uses: actions/checkout@v3
|
||||
with:
|
||||
path: source
|
||||
fetch-depth: 0
|
||||
- id: detect-profiles
|
||||
- name: detect-profiles
|
||||
id: detect-profiles
|
||||
uses: ./source/.github/actions/detect-profiles
|
||||
with:
|
||||
ci_git_token: ${{ secrets.CI_GIT_TOKEN }}
|
||||
- name: get_all_deps
|
||||
if: endsWith(github.repository, 'emqx')
|
||||
run: |
|
||||
|
@ -97,10 +95,10 @@ jobs:
|
|||
echo "EQMX installed"
|
||||
./_build/${{ matrix.profile }}/rel/emqx/bin/emqx uninstall
|
||||
echo "EQMX uninstaled"
|
||||
- uses: actions/upload-artifact@v1
|
||||
- uses: actions/upload-artifact@v3
|
||||
with:
|
||||
name: ${{ matrix.profile }}-windows
|
||||
path: source/_packages/${{ matrix.profile }}/.
|
||||
path: source/_packages/${{ matrix.profile }}/
|
||||
|
||||
mac:
|
||||
needs: prepare
|
||||
|
@ -114,7 +112,7 @@ jobs:
|
|||
runs-on: ${{ matrix.os }}
|
||||
|
||||
steps:
|
||||
- uses: actions/download-artifact@v2
|
||||
- uses: actions/download-artifact@v3
|
||||
with:
|
||||
name: source
|
||||
path: .
|
||||
|
@ -123,10 +121,7 @@ jobs:
|
|||
ln -s . source
|
||||
unzip -q source.zip
|
||||
rm source source.zip
|
||||
- id: detect-profiles
|
||||
uses: ./.github/actions/detect-profiles
|
||||
with:
|
||||
ci_git_token: ${{ secrets.CI_GIT_TOKEN }}
|
||||
- uses: ./.github/actions/detect-profiles
|
||||
- uses: ./.github/actions/package-macos
|
||||
with:
|
||||
otp: ${{ matrix.otp }}
|
||||
|
@ -135,10 +130,10 @@ jobs:
|
|||
apple_developer_identity: ${{ secrets.APPLE_DEVELOPER_IDENTITY }}
|
||||
apple_developer_id_bundle: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE }}
|
||||
apple_developer_id_bundle_password: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE_PASSWORD }}
|
||||
- uses: actions/upload-artifact@v1
|
||||
- uses: actions/upload-artifact@v3
|
||||
with:
|
||||
name: ${EMQX_NAME}-${{ matrix.otp }}
|
||||
path: _packages/${EMQX_NAME}/.
|
||||
path: _packages/${EMQX_NAME}/
|
||||
|
||||
linux:
|
||||
runs-on: ubuntu-20.04
|
||||
|
@ -214,7 +209,7 @@ jobs:
|
|||
- uses: actions/upload-artifact@v1
|
||||
with:
|
||||
name: ${{ matrix.profile }}-${{ matrix.otp }}
|
||||
path: source/_packages/${{ matrix.profile }}/.
|
||||
path: source/_packages/${{ matrix.profile }}/
|
||||
|
||||
docker:
|
||||
runs-on: ubuntu-20.04
|
||||
|
|
|
@ -36,12 +36,15 @@ jobs:
|
|||
|
||||
steps:
|
||||
- uses: actions/checkout@v1
|
||||
- uses: ./.github/actions/detect-profiles
|
||||
with:
|
||||
ci_git_token: ${{ secrets.CI_GIT_TOKEN }}
|
||||
- name: fix-git-unsafe-repository
|
||||
run: git config --global --add safe.directory /__w/emqx/emqx
|
||||
- uses: actions/cache@v2
|
||||
- uses: ./.github/actions/detect-profiles
|
||||
- name: ensure access to github
|
||||
if: endsWith(github.repository, 'enterprise')
|
||||
run: |
|
||||
echo "https://ci%40emqx.io:${{ secrets.CI_GIT_TOKEN }}@github.com" > $HOME/.git-credentials
|
||||
git config --global credential.helper store
|
||||
- uses: actions/cache@v3
|
||||
with:
|
||||
# dialyzer PLTs
|
||||
path: ~/.cache/rebar3/
|
||||
|
@ -54,7 +57,7 @@ jobs:
|
|||
run: make ${EMQX_NAME}-zip
|
||||
- name: build deb/rpm packages
|
||||
run: make ${EMQX_NAME}-pkg
|
||||
- uses: actions/upload-artifact@v1
|
||||
- uses: actions/upload-artifact@v3
|
||||
if: failure()
|
||||
with:
|
||||
name: rebar3.crashdump
|
||||
|
@ -64,7 +67,7 @@ jobs:
|
|||
export CODE_PATH="$GITHUB_WORKSPACE"
|
||||
.ci/build_packages/tests.sh "${EMQX_NAME}" zip
|
||||
.ci/build_packages/tests.sh "${EMQX_NAME}" pkg
|
||||
- uses: actions/upload-artifact@v2
|
||||
- uses: actions/upload-artifact@v3
|
||||
with:
|
||||
name: ${{ matrix.os }}
|
||||
path: _packages/**/*.zip
|
||||
|
@ -116,10 +119,13 @@ jobs:
|
|||
- macos-11
|
||||
runs-on: ${{ matrix.os }}
|
||||
steps:
|
||||
- uses: actions/checkout@v1
|
||||
- uses: actions/checkout@v3
|
||||
- name: ensure access to github
|
||||
if: endsWith(github.repository, 'enterprise')
|
||||
run: |
|
||||
echo "https://ci%40emqx.io:${{ secrets.CI_GIT_TOKEN }}@github.com" > $HOME/.git-credentials
|
||||
git config --global credential.helper store
|
||||
- uses: ./.github/actions/detect-profiles
|
||||
with:
|
||||
ci_git_token: ${{ secrets.CI_GIT_TOKEN }}
|
||||
- uses: ./.github/actions/package-macos
|
||||
with:
|
||||
otp: ${{ matrix.otp }}
|
||||
|
@ -128,13 +134,12 @@ jobs:
|
|||
apple_developer_identity: ${{ secrets.APPLE_DEVELOPER_IDENTITY }}
|
||||
apple_developer_id_bundle: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE }}
|
||||
apple_developer_id_bundle_password: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE_PASSWORD }}
|
||||
- uses: actions/upload-artifact@v1
|
||||
- uses: actions/upload-artifact@v3
|
||||
if: failure()
|
||||
with:
|
||||
name: rebar3.crashdump
|
||||
path: ./rebar3.crashdump
|
||||
|
||||
- uses: actions/upload-artifact@v2
|
||||
- uses: actions/upload-artifact@v3
|
||||
with:
|
||||
name: macos
|
||||
path: _packages/**/*.zip
|
||||
|
|
|
@ -13,14 +13,9 @@ jobs:
|
|||
profiles: ${{ steps.detect-profiles.outputs.profiles}}
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
with:
|
||||
path: source
|
||||
fetch-depth: 0
|
||||
- uses: actions/checkout@v3
|
||||
- id: detect-profiles
|
||||
uses: ./source/.github/actions/detect-profiles
|
||||
with:
|
||||
ci_git_token: ${{ secrets.CI_GIT_TOKEN }}
|
||||
uses: ./.github/actions/detect-profiles
|
||||
|
||||
upload:
|
||||
runs-on: ubuntu-20.04
|
||||
|
@ -59,12 +54,10 @@ jobs:
|
|||
-X POST \
|
||||
-d "{\"repo\":\"emqx/emqx\", \"tag\": \"${{ github.ref_name }}\" }" \
|
||||
${{ secrets.EMQX_IO_RELEASE_API }}
|
||||
- uses: actions/checkout@v2
|
||||
with:
|
||||
fetch-depth: 0
|
||||
- uses: actions/checkout@v3
|
||||
- name: get version
|
||||
id: version
|
||||
run: echo "::set-output name=version::$(./pkg-vsn.sh)"
|
||||
run: echo "version=$(./pkg-vsn.sh)" >> $GITHUB_OUTPUT
|
||||
- uses: emqx/push-helm-action@v1
|
||||
if: github.event_name == 'release' && endsWith(github.repository, 'emqx') && matrix.profile == 'emqx'
|
||||
with:
|
||||
|
|
|
@ -10,140 +10,255 @@ on:
|
|||
pull_request:
|
||||
|
||||
jobs:
|
||||
run_proper_test:
|
||||
runs-on: ubuntu-20.04
|
||||
container: ghcr.io/emqx/emqx-builder/4.4-20:24.3.4.2-1-ubuntu20.04
|
||||
prepare:
|
||||
runs-on: ubuntu-20.04
|
||||
container: ghcr.io/emqx/emqx-builder/4.4-20:24.3.4.2-1-ubuntu20.04
|
||||
outputs:
|
||||
ct_apps: ${{ steps.run_find_apps.outputs.ct_apps }}
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
with:
|
||||
path: source
|
||||
fetch-depth: 0
|
||||
- name: git credentials
|
||||
run: |
|
||||
if make emqx-ee --dry-run > /dev/null 2>&1; then
|
||||
echo "https://ci%40emqx.io:${{ secrets.CI_GIT_TOKEN }}@github.com" > $HOME/.git-credentials
|
||||
git config --global credential.helper store
|
||||
fi
|
||||
- name: find_ct_apps
|
||||
working-directory: source
|
||||
id: run_find_apps
|
||||
# emqx_plugin_libs doesn't have a test suite -> excluded from app list
|
||||
# emqx ct is run independently -> exclude it from the app list
|
||||
run: |
|
||||
ct_apps="$(./scripts/find-apps.sh --json | jq -c 'del (.[] | select (. == "apps/emqx_plugin_libs" or . == "emqx"))')"
|
||||
echo "ct-apps: $ct_apps"
|
||||
echo "ct_apps=$ct_apps" >> $GITHUB_OUTPUT
|
||||
- name: get_all_deps
|
||||
working-directory: source
|
||||
run: |
|
||||
make deps-all
|
||||
./rebar3 as test compile
|
||||
cd ..
|
||||
zip -ryq source.zip source/* source/.[^.]*
|
||||
- uses: actions/upload-artifact@v3
|
||||
with:
|
||||
name: source
|
||||
path: source.zip
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- name: set git credentials
|
||||
run: |
|
||||
if make emqx-ee --dry-run > /dev/null 2>&1; then
|
||||
echo "https://ci%40emqx.io:${{ secrets.CI_GIT_TOKEN }}@github.com" > $HOME/.git-credentials
|
||||
git config --global credential.helper store
|
||||
fi
|
||||
- name: proper
|
||||
run: make proper
|
||||
eunit_and_proper:
|
||||
needs: prepare
|
||||
runs-on: ubuntu-20.04
|
||||
container: ghcr.io/emqx/emqx-builder/4.4-20:24.3.4.2-1-ubuntu20.04
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
task:
|
||||
- eunit
|
||||
- proper
|
||||
steps:
|
||||
- uses: AutoModality/action-clean@v1
|
||||
- uses: actions/download-artifact@v3
|
||||
with:
|
||||
name: source
|
||||
path: .
|
||||
- name: unzip source code
|
||||
run: unzip -o -q source.zip
|
||||
# produces eunit.coverdata and proper.coverdata
|
||||
- name: eunit and proper
|
||||
working-directory: source
|
||||
run: make ${{ matrix.task }}
|
||||
- uses: actions/upload-artifact@v3
|
||||
with:
|
||||
name: cover
|
||||
path: source/_build/test/cover
|
||||
if-no-files-found: warn
|
||||
|
||||
run_common_test:
|
||||
runs-on: ${{ matrix.runs-on }}
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
runs-on:
|
||||
- aws-amd64
|
||||
- ubuntu-20.04
|
||||
use-self-hosted:
|
||||
- ${{ github.repository_owner == 'emqx' }}
|
||||
exclude:
|
||||
- runs-on: ubuntu-20.04
|
||||
use-self-hosted: true
|
||||
- runs-on: aws-amd64
|
||||
use-self-hosted: false
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
# to avoid dirty self-hosted runners
|
||||
- name: stop containers
|
||||
run: |
|
||||
docker rm -f $(docker ps -qa) || true
|
||||
docker network rm $(docker network ls -q) || true
|
||||
- name: docker compose up
|
||||
if: endsWith(github.repository, 'emqx')
|
||||
env:
|
||||
MYSQL_TAG: 8
|
||||
REDIS_TAG: 6
|
||||
MONGO_TAG: 4
|
||||
PGSQL_TAG: 13
|
||||
LDAP_TAG: 2.4.50
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
run: |
|
||||
docker-compose \
|
||||
-f .ci/docker-compose-file/docker-compose.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-toxiproxy.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-ldap-tcp.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-mongo-tcp.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-mysql-tcp.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-pgsql-tcp.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-redis-single-tcp.yaml \
|
||||
up -d --build
|
||||
- name: docker compose up
|
||||
if: endsWith(github.repository, 'emqx-enterprise')
|
||||
env:
|
||||
MYSQL_TAG: 8
|
||||
REDIS_TAG: 6
|
||||
MONGO_TAG: 4
|
||||
PGSQL_TAG: 13
|
||||
LDAP_TAG: 2.4.50
|
||||
OPENTSDB_TAG: latest
|
||||
INFLUXDB_TAG: 1.7.6
|
||||
DYNAMODB_TAG: 1.11.477
|
||||
TIMESCALE_TAG: latest-pg11
|
||||
CASSANDRA_TAG: 3.11.6
|
||||
RABBITMQ_TAG: 3.7
|
||||
KAFKA_TAG: 2.5.0
|
||||
PULSAR_TAG: 2.3.2
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
timeout-minutes: 20
|
||||
run: |
|
||||
docker-compose \
|
||||
-f .ci/docker-compose-file/docker-compose.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-toxiproxy.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-ldap-tcp.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-mongo-tcp.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-mysql-tcp.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-pgsql-tcp.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-redis-single-tcp.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-enterprise.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-enterprise-cassandra-tcp.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-enterprise-dynamodb-tcp.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-enterprise-influxdb-tcp.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-enterprise-kafka-tcp.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-enterprise-opentsdb-tcp.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-enterprise-pulsar-tcp.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-enterprise-rabbit-tcp.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-enterprise-timescale-tcp.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-enterprise-mysql-client.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-enterprise-pgsql-and-timescale-client.yaml \
|
||||
up -d --build
|
||||
docker exec -i erlang bash -c "echo \"https://ci%40emqx.io:${{ secrets.CI_GIT_TOKEN }}@github.com\" > /root/.git-credentials && git config --global credential.helper store"
|
||||
docker exec -i erlang bash -c "git config --global --add safe.directory /emqx"
|
||||
while [ $(docker ps -a --filter name=client --filter exited=0 | wc -l) \
|
||||
!= $(docker ps -a --filter name=client | wc -l) ]; do
|
||||
sleep 5
|
||||
done
|
||||
- name: run eunit
|
||||
run: |
|
||||
docker exec -i erlang bash -c "make eunit"
|
||||
- name: run common test
|
||||
run: |
|
||||
docker exec -i erlang bash -c "make ct"
|
||||
- name: run cover
|
||||
run: |
|
||||
printenv > .env
|
||||
docker exec -i erlang bash -c "git config --global --add safe.directory /emqx"
|
||||
docker exec -i erlang bash -c "make cover"
|
||||
docker exec --env-file .env -i erlang bash -c "make coveralls"
|
||||
- name: cat rebar.crashdump
|
||||
if: failure()
|
||||
run: if [ -f 'rebar3.crashdump' ];then cat 'rebar3.crashdump'; fi
|
||||
- uses: actions/upload-artifact@v1
|
||||
if: failure()
|
||||
with:
|
||||
name: logs
|
||||
path: _build/test/logs
|
||||
- uses: actions/upload-artifact@v1
|
||||
with:
|
||||
name: cover
|
||||
path: _build/test/cover
|
||||
emqx_ct:
|
||||
needs: prepare
|
||||
runs-on: ubuntu-20.04
|
||||
container: ghcr.io/emqx/emqx-builder/4.4-20:24.3.4.2-1-ubuntu20.04
|
||||
strategy:
|
||||
fail-fast: false
|
||||
steps:
|
||||
- uses: AutoModality/action-clean@v1
|
||||
- uses: actions/download-artifact@v3
|
||||
with:
|
||||
name: source
|
||||
path: .
|
||||
- name: unzip source code
|
||||
run: unzip -o -q source.zip
|
||||
# produces emqx-emqx.coverdata
|
||||
- name: emqx-ct-pipeline
|
||||
working-directory: source
|
||||
run: make emqx-ct-pipeline
|
||||
- uses: actions/upload-artifact@v3
|
||||
with:
|
||||
name: cover
|
||||
path: source/_build/test/cover
|
||||
if-no-files-found: warn
|
||||
|
||||
finish:
|
||||
needs: run_common_test
|
||||
runs-on: ubuntu-20.04
|
||||
steps:
|
||||
- name: Coveralls Finished
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
run: |
|
||||
curl -v -k https://coveralls.io/webhook \
|
||||
--header "Content-Type: application/json" \
|
||||
--data "{\"repo_name\":\"$GITHUB_REPOSITORY\",\"repo_token\":\"$GITHUB_TOKEN\",\"payload\":{\"build_num\":$GITHUB_RUN_ID,\"status\":\"done\"}}" || true
|
||||
ct:
|
||||
needs: prepare
|
||||
runs-on: ${{ matrix.runs-on }}
|
||||
strategy:
|
||||
max-parallel: 12
|
||||
fail-fast: false
|
||||
matrix:
|
||||
app_name: ${{ fromJson(needs.prepare.outputs.ct_apps) }}
|
||||
runs-on:
|
||||
- aws-amd64
|
||||
- ubuntu-20.04
|
||||
use-self-hosted:
|
||||
- ${{ github.repository_owner == 'emqx' }}
|
||||
exclude:
|
||||
- runs-on: ubuntu-20.04
|
||||
use-self-hosted: true
|
||||
- runs-on: aws-amd64
|
||||
use-self-hosted: false
|
||||
steps:
|
||||
- uses: AutoModality/action-clean@v1
|
||||
- uses: actions/download-artifact@v3
|
||||
with:
|
||||
name: source
|
||||
path: .
|
||||
- name: unzip source code
|
||||
run: unzip -q source.zip
|
||||
# to avoid dirty self-hosted runners
|
||||
- name: stop containers
|
||||
run: |
|
||||
docker rm -f $(docker ps -qa) || true
|
||||
docker network rm $(docker network ls -q) || true
|
||||
- name: docker compose up
|
||||
working-directory: source
|
||||
if: endsWith(github.repository, 'emqx')
|
||||
env:
|
||||
MYSQL_TAG: 8
|
||||
REDIS_TAG: 6
|
||||
MONGO_TAG: 4
|
||||
PGSQL_TAG: 13
|
||||
LDAP_TAG: 2.4.50
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
run: |
|
||||
docker-compose \
|
||||
-f .ci/docker-compose-file/docker-compose.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-toxiproxy.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-ldap-tcp.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-mongo-tcp.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-mysql-tcp.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-pgsql-tcp.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-redis-single-tcp.yaml \
|
||||
up -d --build
|
||||
docker exec -i erlang bash -c "git config --global --add safe.directory /emqx"
|
||||
- name: docker compose up
|
||||
working-directory: source
|
||||
if: endsWith(github.repository, 'emqx-enterprise')
|
||||
env:
|
||||
MYSQL_TAG: 8
|
||||
REDIS_TAG: 6
|
||||
MONGO_TAG: 4
|
||||
PGSQL_TAG: 13
|
||||
LDAP_TAG: 2.4.50
|
||||
OPENTSDB_TAG: latest
|
||||
INFLUXDB_TAG: 1.7.6
|
||||
DYNAMODB_TAG: 1.11.477
|
||||
TIMESCALE_TAG: latest-pg11
|
||||
CASSANDRA_TAG: 3.11.6
|
||||
RABBITMQ_TAG: 3.7
|
||||
KAFKA_TAG: 2.5.0
|
||||
PULSAR_TAG: 2.3.2
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
timeout-minutes: 20
|
||||
run: |
|
||||
docker-compose \
|
||||
-f .ci/docker-compose-file/docker-compose.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-toxiproxy.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-ldap-tcp.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-mongo-tcp.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-mysql-tcp.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-pgsql-tcp.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-redis-single-tcp.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-enterprise.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-enterprise-cassandra-tcp.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-enterprise-dynamodb-tcp.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-enterprise-influxdb-tcp.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-enterprise-kafka-tcp.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-enterprise-opentsdb-tcp.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-enterprise-pulsar-tcp.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-enterprise-rabbit-tcp.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-enterprise-timescale-tcp.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-enterprise-mysql-client.yaml \
|
||||
-f .ci/docker-compose-file/docker-compose-enterprise-pgsql-and-timescale-client.yaml \
|
||||
up -d --build
|
||||
docker exec -i erlang bash -c "echo \"https://ci%40emqx.io:${{ secrets.CI_GIT_TOKEN }}@github.com\" > /root/.git-credentials && git config --global credential.helper store"
|
||||
docker exec -i erlang bash -c "git config --global --add safe.directory /emqx"
|
||||
while [ $(docker ps -a --filter name=client --filter exited=0 | wc -l) \
|
||||
!= $(docker ps -a --filter name=client | wc -l) ]; do
|
||||
sleep 5
|
||||
done
|
||||
- name: run common test
|
||||
run: docker exec -i erlang bash -c "make ${{ matrix.app_name }}-ct-pipeline"
|
||||
- name: cat rebar.crashdump
|
||||
if: failure()
|
||||
working-directory: source
|
||||
run: if [ -f 'rebar3.crashdump' ];then cat 'rebar3.crashdump'; fi
|
||||
- name: set log file name
|
||||
if: failure()
|
||||
run: echo "LOGFILENAME=logs-$(echo ${{ matrix.app_name }} | tr '/' '_')" >> $GITHUB_ENV
|
||||
- uses: actions/upload-artifact@v3
|
||||
if: failure()
|
||||
with:
|
||||
name: ${{ env.LOGFILENAME }}
|
||||
path: source/_build/test/logs
|
||||
if-no-files-found: warn
|
||||
- uses: actions/upload-artifact@v3
|
||||
with:
|
||||
name: cover
|
||||
path: source/_build/test/cover
|
||||
if-no-files-found: warn
|
||||
|
||||
make_cover:
|
||||
needs:
|
||||
- eunit_and_proper
|
||||
- emqx_ct
|
||||
- ct
|
||||
runs-on: ubuntu-20.04
|
||||
container: ghcr.io/emqx/emqx-builder/4.4-20:24.3.4.2-1-ubuntu20.04
|
||||
steps:
|
||||
- uses: AutoModality/action-clean@v1
|
||||
- uses: actions/download-artifact@v3
|
||||
with:
|
||||
name: source
|
||||
path: .
|
||||
- name: unzip source code
|
||||
run: unzip -q source.zip
|
||||
- uses: actions/download-artifact@v3
|
||||
name: download cover data
|
||||
with:
|
||||
name: cover
|
||||
path: source/_build/test/cover
|
||||
- name: make cover
|
||||
working-directory: source
|
||||
run: make cover
|
||||
- name: send to coveralls
|
||||
working-directory: source
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
run: make coveralls
|
||||
- name: get coveralls logs
|
||||
working-directory: source
|
||||
if: failure()
|
||||
run: cat rebar3.crashdump
|
||||
|
||||
finish:
|
||||
needs: make_cover
|
||||
runs-on: ubuntu-20.04
|
||||
steps:
|
||||
- name: Coveralls Finished
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
run: |
|
||||
curl -v -k https://coveralls.io/webhook \
|
||||
--header "Content-Type: application/json" \
|
||||
--data "{\"repo_name\":\"$GITHUB_REPOSITORY\",\"repo_token\":\"$GITHUB_TOKEN\",\"payload\":{\"build_num\":$GITHUB_RUN_ID,\"status\":\"done\"}}" || true
|
||||
|
|
8
Makefile
8
Makefile
|
@ -62,6 +62,14 @@ $1-ct: $(REBAR)
|
|||
endef
|
||||
$(foreach app,$(APPS),$(eval $(call gen-app-ct-target,$(app))))
|
||||
|
||||
## app/name-ct-pipeline targets are used in pipeline -> make cover data for each app
|
||||
.PHONY: $(APPS:%=%-ct-pipeline)
|
||||
define gen-app-ct-target-pipeline
|
||||
$1-ct-pipeline: $(REBAR)
|
||||
$(REBAR) ct --name 'test@127.0.0.1' -c -v --cover_export_name $(PROFILE)-$(subst /,-,$1) --suite $(shell $(CURDIR)/scripts/find-suites.sh $1)
|
||||
endef
|
||||
$(foreach app,$(APPS),$(eval $(call gen-app-ct-target-pipeline,$(app))))
|
||||
|
||||
## apps/name-prop targets
|
||||
.PHONY: $(APPS:%=%-prop)
|
||||
define gen-app-prop-target
|
||||
|
|
|
@ -40,6 +40,7 @@
|
|||
|
||||
-define(RESOURCE_TYPE_MQTT, 'bridge_mqtt').
|
||||
-define(RESOURCE_TYPE_RPC, 'bridge_rpc').
|
||||
-define(BAD_TOPIC_WITH_WILDCARD, wildcard_topic_not_allowed_for_publish).
|
||||
|
||||
-define(RESOURCE_CONFIG_SPEC_MQTT, #{
|
||||
address => #{
|
||||
|
@ -494,7 +495,7 @@ on_action_create_data_to_mqtt_broker(ActId, Opts = #{<<"pool">> := PoolName,
|
|||
PayloadTks = emqx_rule_utils:preproc_tmpl(PayloadTmpl),
|
||||
TopicTks = case ForwardTopic == <<"">> of
|
||||
true -> undefined;
|
||||
false -> emqx_rule_utils:preproc_tmpl(ForwardTopic)
|
||||
false -> emqx_rule_utils:preproc_tmpl(assert_topic_valid(ForwardTopic))
|
||||
end,
|
||||
Opts.
|
||||
|
||||
|
@ -515,7 +516,7 @@ on_action_data_to_mqtt_broker(Msg, _Env =
|
|||
qos = QoS,
|
||||
from = From,
|
||||
flags = Flags,
|
||||
topic = Topic1,
|
||||
topic = assert_topic_valid(Topic1),
|
||||
payload = format_data(PayloadTks, Msg),
|
||||
timestamp = TimeStamp},
|
||||
ecpool:with_client(PoolName,
|
||||
|
@ -583,7 +584,7 @@ options(Options, PoolName, ResId) ->
|
|||
Get = fun(Key) -> GetD(Key, undefined) end,
|
||||
Address = Get(<<"address">>),
|
||||
[{max_inflight_batches, 32},
|
||||
{forward_mountpoint, str(Get(<<"mountpoint">>))},
|
||||
{forward_mountpoint, str(assert_topic_valid(Get(<<"mountpoint">>)))},
|
||||
{disk_cache, cuttlefish_flag:parse(str(GetD(<<"disk_cache">>, "off")))},
|
||||
{start_type, auto},
|
||||
{reconnect_delay_ms, cuttlefish_duration:parse(str(Get(<<"reconnect_interval">>)), ms)},
|
||||
|
@ -610,6 +611,12 @@ options(Options, PoolName, ResId) ->
|
|||
| maybe_ssl(Options, Get(<<"ssl">>), ResId)]
|
||||
end.
|
||||
|
||||
assert_topic_valid(T) ->
|
||||
case emqx_topic:wildcard(T) of
|
||||
true -> throw({?BAD_TOPIC_WITH_WILDCARD, T});
|
||||
false -> T
|
||||
end.
|
||||
|
||||
maybe_ssl(_Options, false, _ResId) ->
|
||||
[];
|
||||
maybe_ssl(Options, true, ResId) ->
|
||||
|
|
|
@ -22,7 +22,7 @@ management.default_application.secret = public
|
|||
|
||||
## Initialize apps file
|
||||
## Is used to add administrative app/secrets when EMQX is launched for the first time.
|
||||
## This config will not take any effect once EMQX database is populated with the provided apps.
|
||||
## This config will not take any effect once EMQX database has one or more apps.
|
||||
## The file content format is as below:
|
||||
## ```
|
||||
##819e5db182cf:l9C5suZClIF3FvdzWqmINrVU61WNfIjcglxw9CVM7y1VI
|
||||
|
|
|
@ -32,8 +32,11 @@ init_per_suite(Cfg) ->
|
|||
ok = emqx_dashboard_admin:mnesia(boot),
|
||||
application:load(emqx_modules),
|
||||
application:load(emqx_bridge_mqtt),
|
||||
ekka_mnesia:start(),
|
||||
emqx_dashboard_admin:mnesia(boot),
|
||||
emqx_ct_helpers:start_apps([emqx_rule_engine, emqx_management]),
|
||||
application:ensure_all_started(emqx_dashboard),
|
||||
ok = emqx_rule_engine:load_providers(),
|
||||
Cfg.
|
||||
|
||||
end_per_suite(Cfg) ->
|
||||
|
|
|
@ -41,6 +41,8 @@ init_per_suite(Config) ->
|
|||
Config.
|
||||
|
||||
end_per_suite(_) ->
|
||||
ok = application:unset_env(emqx_management, bootstrap_apps_file),
|
||||
_ = mnesia:clear_table(mqtt_app),
|
||||
emqx_ct_helpers:stop_apps([]),
|
||||
ok.
|
||||
|
||||
|
|
|
@ -28,3 +28,5 @@
|
|||
-define(bound_v(Key, ENVS0),
|
||||
maps:get(Key,
|
||||
maps:get(?BINDING_KEYS, ENVS0, #{}))).
|
||||
|
||||
-define(JWT_TABLE, emqx_rule_engine_jwt_table).
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
%% -*- mode: erlang -*-
|
||||
{deps, []}.
|
||||
{deps, [ {jose, {git, "https://github.com/emqx/erlang-jose", {tag, "emqx-1.11.3"}}}
|
||||
]}.
|
||||
|
||||
%% Comple Opts
|
||||
{erl_opts, [warn_unused_vars,
|
||||
|
|
|
@ -22,6 +22,8 @@
|
|||
-include_lib("emqx/include/emqx.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
|
||||
-define(BAD_TOPIC_WITH_WILDCARD, wildcard_topic_not_allowed_for_publish).
|
||||
|
||||
-define(REPUBLISH_PARAMS_SPEC, #{
|
||||
target_topic => #{
|
||||
order => 1,
|
||||
|
@ -163,7 +165,7 @@ on_action_create_republish(Id, Params = #{
|
|||
}) ->
|
||||
TargetRetain = to_retain(maps:get(<<"target_retain">>, Params, <<"false">>)),
|
||||
TargetQoS = to_qos(TargetQoS0),
|
||||
TopicTks = emqx_rule_utils:preproc_tmpl(TargetTopic),
|
||||
TopicTks = emqx_rule_utils:preproc_tmpl(assert_topic_valid(TargetTopic)),
|
||||
PayloadTks = emqx_rule_utils:preproc_tmpl(PayloadTmpl),
|
||||
Params.
|
||||
|
||||
|
@ -201,7 +203,7 @@ on_action_republish(Selected, _Envs = #{
|
|||
from = ActId,
|
||||
flags = Flags#{retain => get_retain(TargetRetain, Selected)},
|
||||
headers = #{republish_by => ActId},
|
||||
topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected),
|
||||
topic = assert_topic_valid(emqx_rule_utils:proc_tmpl(TopicTks, Selected)),
|
||||
payload = format_msg(PayloadTks, Selected),
|
||||
timestamp = Timestamp
|
||||
},
|
||||
|
@ -226,7 +228,7 @@ on_action_republish(Selected, _Envs = #{
|
|||
from = ActId,
|
||||
flags = #{dup => false, retain => get_retain(TargetRetain, Selected)},
|
||||
headers = #{republish_by => ActId},
|
||||
topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected),
|
||||
topic = assert_topic_valid(emqx_rule_utils:proc_tmpl(TopicTks, Selected)),
|
||||
payload = format_msg(PayloadTks, Selected),
|
||||
timestamp = erlang:system_time(millisecond)
|
||||
},
|
||||
|
@ -270,6 +272,12 @@ get_qos(-1, _Data, Default) -> Default;
|
|||
get_qos(TargetQoS, Data, _Default) ->
|
||||
qos(emqx_rule_utils:replace_var(TargetQoS, Data)).
|
||||
|
||||
assert_topic_valid(T) ->
|
||||
case emqx_topic:wildcard(T) of
|
||||
true -> throw({?BAD_TOPIC_WITH_WILDCARD, T});
|
||||
false -> T
|
||||
end.
|
||||
|
||||
qos(<<"0">>) -> 0;
|
||||
qos(<<"1">>) -> 1;
|
||||
qos(<<"2">>) -> 2;
|
||||
|
|
|
@ -2,8 +2,8 @@
|
|||
[{description, "EMQ X Rule Engine"},
|
||||
{vsn, "4.4.11"}, % strict semver, bump manually!
|
||||
{modules, []},
|
||||
{registered, [emqx_rule_engine_sup, emqx_rule_registry]},
|
||||
{applications, [kernel,stdlib,rulesql,getopt]},
|
||||
{registered, [emqx_rule_engine_sup, emqx_rule_registry, emqx_rule_engine_jwt_sup]},
|
||||
{applications, [kernel,stdlib,rulesql,getopt,jose]},
|
||||
{mod, {emqx_rule_engine_app, []}},
|
||||
{env, []},
|
||||
{licenses, ["Apache-2.0"]},
|
||||
|
|
|
@ -2,7 +2,13 @@
|
|||
%% Unless you know what you are doing, DO NOT edit manually!!
|
||||
{VSN,
|
||||
[{"4.4.10",
|
||||
[{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||
[{add_module,emqx_rule_engine_jwt},
|
||||
{add_module,emqx_rule_engine_jwt_worker},
|
||||
{add_module,emqx_rule_engine_jwt_sup},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{apply,{emqx_rule_engine_sup,start_jwt_sup,[]}},
|
||||
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||
|
@ -12,7 +18,13 @@
|
|||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.9",
|
||||
[{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||
[{add_module,emqx_rule_engine_jwt},
|
||||
{add_module,emqx_rule_engine_jwt_worker},
|
||||
{add_module,emqx_rule_engine_jwt_sup},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{apply,{emqx_rule_engine_sup,start_jwt_sup,[]}},
|
||||
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||
|
@ -23,7 +35,13 @@
|
|||
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.8",
|
||||
[{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||
[{add_module,emqx_rule_engine_jwt},
|
||||
{add_module,emqx_rule_engine_jwt_worker},
|
||||
{add_module,emqx_rule_engine_jwt_sup},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{apply,{emqx_rule_engine_sup,start_jwt_sup,[]}},
|
||||
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||
|
@ -35,7 +53,12 @@
|
|||
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
|
||||
{<<"4\\.4\\.[6-7]">>,
|
||||
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
[{add_module,emqx_rule_engine_jwt},
|
||||
{add_module,emqx_rule_engine_jwt_worker},
|
||||
{add_module,emqx_rule_engine_jwt_sup},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{apply,{emqx_rule_engine_sup,start_jwt_sup,[]}},
|
||||
{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
|
@ -48,7 +71,12 @@
|
|||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.5",
|
||||
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
[{add_module,emqx_rule_engine_jwt},
|
||||
{add_module,emqx_rule_engine_jwt_worker},
|
||||
{add_module,emqx_rule_engine_jwt_sup},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{apply,{emqx_rule_engine_sup,start_jwt_sup,[]}},
|
||||
{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
|
@ -62,7 +90,12 @@
|
|||
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.4",
|
||||
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
[{add_module,emqx_rule_engine_jwt},
|
||||
{add_module,emqx_rule_engine_jwt_worker},
|
||||
{add_module,emqx_rule_engine_jwt_sup},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{apply,{emqx_rule_engine_sup,start_jwt_sup,[]}},
|
||||
{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
|
@ -76,7 +109,12 @@
|
|||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.3",
|
||||
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
[{add_module,emqx_rule_engine_jwt},
|
||||
{add_module,emqx_rule_engine_jwt_worker},
|
||||
{add_module,emqx_rule_engine_jwt_sup},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{apply,{emqx_rule_engine_sup,start_jwt_sup,[]}},
|
||||
{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
|
@ -92,7 +130,12 @@
|
|||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.2",
|
||||
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
[{add_module,emqx_rule_engine_jwt},
|
||||
{add_module,emqx_rule_engine_jwt_worker},
|
||||
{add_module,emqx_rule_engine_jwt_sup},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{apply,{emqx_rule_engine_sup,start_jwt_sup,[]}},
|
||||
{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
|
@ -109,7 +152,12 @@
|
|||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.1",
|
||||
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
[{add_module,emqx_rule_engine_jwt},
|
||||
{add_module,emqx_rule_engine_jwt_worker},
|
||||
{add_module,emqx_rule_engine_jwt_sup},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{apply,{emqx_rule_engine_sup,start_jwt_sup,[]}},
|
||||
{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
|
@ -126,7 +174,12 @@
|
|||
{add_module,emqx_rule_date},
|
||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.0",
|
||||
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
[{add_module,emqx_rule_engine_jwt},
|
||||
{add_module,emqx_rule_engine_jwt_worker},
|
||||
{add_module,emqx_rule_engine_jwt_sup},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{apply,{emqx_rule_engine_sup,start_jwt_sup,[]}},
|
||||
{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
|
@ -144,42 +197,64 @@
|
|||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||
{<<".*">>,[]}],
|
||||
[{"4.4.10",
|
||||
[{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||
{apply,{supervisor,terminate_child,
|
||||
[emqx_rule_engine_sup,emqx_rule_engine_jwt_sup]}},
|
||||
{delete_module,emqx_rule_engine_jwt_sup},
|
||||
{delete_module,emqx_rule_engine_jwt_worker},
|
||||
{delete_module,emqx_rule_engine_jwt}]},
|
||||
{"4.4.9",
|
||||
[{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
|
||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||
{apply,{supervisor,terminate_child,
|
||||
[emqx_rule_engine_sup,emqx_rule_engine_jwt_sup]}},
|
||||
{delete_module,emqx_rule_engine_jwt_sup},
|
||||
{delete_module,emqx_rule_engine_jwt_worker},
|
||||
{delete_module,emqx_rule_engine_jwt}]},
|
||||
{"4.4.8",
|
||||
[{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
|
||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||
{apply,{supervisor,terminate_child,
|
||||
[emqx_rule_engine_sup,emqx_rule_engine_jwt_sup]}},
|
||||
{delete_module,emqx_rule_engine_jwt_sup},
|
||||
{delete_module,emqx_rule_engine_jwt_worker},
|
||||
{delete_module,emqx_rule_engine_jwt}]},
|
||||
{<<"4\\.4\\.[6-7]">>,
|
||||
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
|
@ -188,11 +263,17 @@
|
|||
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||
{apply,{supervisor,terminate_child,
|
||||
[emqx_rule_engine_sup,emqx_rule_engine_jwt_sup]}},
|
||||
{delete_module,emqx_rule_engine_jwt_sup},
|
||||
{delete_module,emqx_rule_engine_jwt_worker},
|
||||
{delete_module,emqx_rule_engine_jwt}]},
|
||||
{"4.4.5",
|
||||
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
|
@ -202,11 +283,17 @@
|
|||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||
{apply,{supervisor,terminate_child,
|
||||
[emqx_rule_engine_sup,emqx_rule_engine_jwt_sup]}},
|
||||
{delete_module,emqx_rule_engine_jwt_sup},
|
||||
{delete_module,emqx_rule_engine_jwt_worker},
|
||||
{delete_module,emqx_rule_engine_jwt}]},
|
||||
{"4.4.4",
|
||||
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
|
@ -216,10 +303,16 @@
|
|||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
{apply,{supervisor,terminate_child,
|
||||
[emqx_rule_engine_sup,emqx_rule_engine_jwt_sup]}},
|
||||
{delete_module,emqx_rule_engine_jwt_sup},
|
||||
{delete_module,emqx_rule_engine_jwt_worker},
|
||||
{delete_module,emqx_rule_engine_jwt}]},
|
||||
{"4.4.3",
|
||||
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
|
@ -232,10 +325,16 @@
|
|||
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||
{apply,{supervisor,terminate_child,
|
||||
[emqx_rule_engine_sup,emqx_rule_engine_jwt_sup]}},
|
||||
{delete_module,emqx_rule_engine_jwt_sup},
|
||||
{delete_module,emqx_rule_engine_jwt_worker},
|
||||
{delete_module,emqx_rule_engine_jwt}]},
|
||||
{"4.4.2",
|
||||
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||
|
@ -249,10 +348,16 @@
|
|||
{delete_module,emqx_rule_date},
|
||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
|
||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||
{apply,{supervisor,terminate_child,
|
||||
[emqx_rule_engine_sup,emqx_rule_engine_jwt_sup]}},
|
||||
{delete_module,emqx_rule_engine_jwt_sup},
|
||||
{delete_module,emqx_rule_engine_jwt_worker},
|
||||
{delete_module,emqx_rule_engine_jwt}]},
|
||||
{"4.4.1",
|
||||
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||
|
@ -266,10 +371,16 @@
|
|||
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
{delete_module,emqx_rule_date},
|
||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||
{apply,{supervisor,terminate_child,
|
||||
[emqx_rule_engine_sup,emqx_rule_engine_jwt_sup]}},
|
||||
{delete_module,emqx_rule_engine_jwt_sup},
|
||||
{delete_module,emqx_rule_engine_jwt_worker},
|
||||
{delete_module,emqx_rule_engine_jwt}]},
|
||||
{"4.4.0",
|
||||
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||
|
@ -283,5 +394,10 @@
|
|||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||
{delete_module,emqx_rule_date}]},
|
||||
{apply,{supervisor,terminate_child,
|
||||
[emqx_rule_engine_sup,emqx_rule_engine_jwt_sup]}},
|
||||
{delete_module,emqx_rule_date},
|
||||
{delete_module,emqx_rule_engine_jwt_sup},
|
||||
{delete_module,emqx_rule_engine_jwt_worker},
|
||||
{delete_module,emqx_rule_engine_jwt}]},
|
||||
{<<".*">>,[]}]}.
|
||||
|
|
|
@ -277,7 +277,7 @@ create_resource(#{type := Type, config := Config0} = Params, Retry) ->
|
|||
_ = try ?CLUSTER_CALL(init_resource_with_retrier, InitArgs, ok,
|
||||
init_resource, InitArgs)
|
||||
catch throw : Reason ->
|
||||
?LOG(error, "create_resource failed: ~0p", [Reason])
|
||||
?LOG_SENSITIVE(warning, "create_resource failed: ~0p", [Reason])
|
||||
end,
|
||||
{ok, Resource};
|
||||
no_retry ->
|
||||
|
@ -285,6 +285,7 @@ create_resource(#{type := Type, config := Config0} = Params, Retry) ->
|
|||
_ = ?CLUSTER_CALL(init_resource, InitArgs),
|
||||
{ok, Resource}
|
||||
catch throw : Reason ->
|
||||
?LOG_SENSITIVE(error, "create_resource failed: ~0p", [Reason]),
|
||||
{error, Reason}
|
||||
end
|
||||
end;
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_rule_engine_jwt).
|
||||
|
||||
-include("rule_engine.hrl").
|
||||
-include("rule_actions.hrl").
|
||||
|
||||
%% API
|
||||
-export([ lookup_jwt/1
|
||||
, lookup_jwt/2
|
||||
]).
|
||||
|
||||
-type jwt() :: binary().
|
||||
|
||||
-spec lookup_jwt(resource_id()) -> {ok, jwt()} | {error, not_found}.
|
||||
lookup_jwt(ResourceId) ->
|
||||
?MODULE:lookup_jwt(?JWT_TABLE, ResourceId).
|
||||
|
||||
-spec lookup_jwt(ets:table(), resource_id()) -> {ok, jwt()} | {error, not_found}.
|
||||
lookup_jwt(TId, ResourceId) ->
|
||||
try
|
||||
case ets:lookup(TId, {ResourceId, jwt}) of
|
||||
[{{ResourceId, jwt}, JWT}] ->
|
||||
{ok, JWT};
|
||||
[] ->
|
||||
{error, not_found}
|
||||
end
|
||||
catch
|
||||
error:badarg ->
|
||||
{error, not_found}
|
||||
end.
|
|
@ -0,0 +1,89 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_rule_engine_jwt_sup).
|
||||
|
||||
-behaviour(supervisor).
|
||||
|
||||
-export([ start_link/0
|
||||
, ensure_worker_present/2
|
||||
, ensure_worker_deleted/1
|
||||
]).
|
||||
|
||||
-export([init/1]).
|
||||
|
||||
-include_lib("emqx_rule_engine/include/rule_actions.hrl").
|
||||
|
||||
-type worker_id() :: term().
|
||||
|
||||
start_link() ->
|
||||
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||
|
||||
init([]) ->
|
||||
ensure_jwt_table(),
|
||||
SupFlags = #{ strategy => one_for_one
|
||||
, intensity => 10
|
||||
, period => 5
|
||||
, auto_shutdown => never
|
||||
},
|
||||
ChildSpecs = [],
|
||||
{ok, {SupFlags, ChildSpecs}}.
|
||||
|
||||
%% @doc Starts a new JWT worker. The caller should use
|
||||
%% `emqx_rule_engine_jwt_sup:ensure_jwt/1' to ensure that a JWT has
|
||||
%% been stored, if synchronization is needed.
|
||||
-spec ensure_worker_present(worker_id(), map()) ->
|
||||
{ok, supervisor:child()}.
|
||||
ensure_worker_present(Id, Config) ->
|
||||
ChildSpec = jwt_worker_child_spec(Id, Config),
|
||||
case supervisor:start_child(?MODULE, ChildSpec) of
|
||||
{ok, Pid} ->
|
||||
{ok, Pid};
|
||||
{error, {already_started, Pid}} ->
|
||||
{ok, Pid};
|
||||
{error, already_present} ->
|
||||
supervisor:restart_child(?MODULE, Id)
|
||||
end.
|
||||
|
||||
%% @doc Stops a given JWT worker by its id.
|
||||
-spec ensure_worker_deleted(worker_id()) -> ok.
|
||||
ensure_worker_deleted(Id) ->
|
||||
case supervisor:terminate_child(?MODULE, Id) of
|
||||
ok -> ok;
|
||||
{error, not_found} -> ok
|
||||
end.
|
||||
|
||||
jwt_worker_child_spec(Id, Config) ->
|
||||
#{ id => Id
|
||||
, start => {emqx_rule_engine_jwt_worker, start_link, [Config]}
|
||||
, restart => transient
|
||||
, type => worker
|
||||
, significant => false
|
||||
, shutdown => brutal_kill
|
||||
, modules => [emqx_rule_engine_jwt_worker]
|
||||
}.
|
||||
|
||||
-spec ensure_jwt_table() -> ok.
|
||||
ensure_jwt_table() ->
|
||||
case ets:whereis(?JWT_TABLE) of
|
||||
undefined ->
|
||||
Opts = [named_table, public,
|
||||
{read_concurrency, true}, ordered_set],
|
||||
_ = ets:new(?JWT_TABLE, Opts),
|
||||
ok;
|
||||
_ ->
|
||||
ok
|
||||
end.
|
|
@ -0,0 +1,216 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_rule_engine_jwt_worker).
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
%% API
|
||||
-export([ start_link/1
|
||||
, ensure_jwt/1
|
||||
]).
|
||||
|
||||
%% gen_server API
|
||||
-export([ init/1
|
||||
, handle_continue/2
|
||||
, handle_call/3
|
||||
, handle_cast/2
|
||||
, handle_info/2
|
||||
, format_status/1
|
||||
, format_status/2
|
||||
]).
|
||||
|
||||
-include_lib("jose/include/jose_jwk.hrl").
|
||||
-include_lib("emqx_rule_engine/include/rule_engine.hrl").
|
||||
-include_lib("emqx_rule_engine/include/rule_actions.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
|
||||
-type config() :: #{ private_key := binary()
|
||||
, resource_id := resource_id()
|
||||
, expiration := timer:time()
|
||||
, table := ets:table()
|
||||
, iss := binary()
|
||||
, sub := binary()
|
||||
, aud := binary()
|
||||
, kid := binary()
|
||||
, alg := binary()
|
||||
}.
|
||||
-type jwt() :: binary().
|
||||
-type state() :: #{ refresh_timer := undefined | timer:tref()
|
||||
, resource_id := resource_id()
|
||||
, expiration := timer:time()
|
||||
, table := ets:table()
|
||||
, jwt := undefined | jwt()
|
||||
%% only undefined during startup
|
||||
, jwk := undefined | jose_jwk:key()
|
||||
, iss := binary()
|
||||
, sub := binary()
|
||||
, aud := binary()
|
||||
, kid := binary()
|
||||
, alg := binary()
|
||||
}.
|
||||
|
||||
-define(refresh_jwt, refresh_jwt).
|
||||
|
||||
%%-----------------------------------------------------------------------------------------
|
||||
%% API
|
||||
%%-----------------------------------------------------------------------------------------
|
||||
|
||||
-spec start_link(config()) -> gen_server:start_ret().
|
||||
start_link(#{ private_key := _
|
||||
, expiration := _
|
||||
, resource_id := _
|
||||
, table := _
|
||||
, iss := _
|
||||
, sub := _
|
||||
, aud := _
|
||||
, kid := _
|
||||
, alg := _
|
||||
} = Config) ->
|
||||
gen_server:start_link(?MODULE, Config, []).
|
||||
|
||||
-spec ensure_jwt(pid()) -> reference().
|
||||
ensure_jwt(Worker) ->
|
||||
Ref = alias([reply]),
|
||||
gen_server:cast(Worker, {ensure_jwt, Ref}),
|
||||
Ref.
|
||||
|
||||
%%-----------------------------------------------------------------------------------------
|
||||
%% gen_server API
|
||||
%%-----------------------------------------------------------------------------------------
|
||||
|
||||
-spec init(config()) -> {ok, state(), {continue, {make_key, binary()}}}
|
||||
| {stop, {error, term()}}.
|
||||
init(#{private_key := PrivateKeyPEM} = Config) ->
|
||||
State0 = maps:without([private_key], Config),
|
||||
State = State0#{ jwk => undefined
|
||||
, jwt => undefined
|
||||
, refresh_timer => undefined
|
||||
},
|
||||
{ok, State, {continue, {make_key, PrivateKeyPEM}}}.
|
||||
|
||||
handle_continue({make_key, PrivateKeyPEM}, State0) ->
|
||||
case jose_jwk:from_pem(PrivateKeyPEM) of
|
||||
JWK = #jose_jwk{} ->
|
||||
State = State0#{jwk := JWK},
|
||||
{noreply, State, {continue, create_token}};
|
||||
[] ->
|
||||
?tp(rule_engine_jwt_worker_startup_error, #{error => empty_key}),
|
||||
{stop, {shutdown, {error, empty_key}}, State0};
|
||||
{error, Reason} ->
|
||||
Error = {invalid_private_key, Reason},
|
||||
?tp(rule_engine_jwt_worker_startup_error, #{error => Error}),
|
||||
{stop, {shutdown, {error, Error}}, State0};
|
||||
Error0 ->
|
||||
Error = {invalid_private_key, Error0},
|
||||
?tp(rule_engine_jwt_worker_startup_error, #{error => Error}),
|
||||
{stop, {shutdown, {error, Error}}, State0}
|
||||
end;
|
||||
handle_continue(create_token, State0) ->
|
||||
State = generate_and_store_jwt(State0),
|
||||
{noreply, State}.
|
||||
|
||||
handle_call(_Req, _From, State) ->
|
||||
{reply, {error, bad_call}, State}.
|
||||
|
||||
handle_cast({ensure_jwt, From}, State0 = #{jwt := JWT}) ->
|
||||
State =
|
||||
case JWT of
|
||||
undefined ->
|
||||
generate_and_store_jwt(State0);
|
||||
_ ->
|
||||
State0
|
||||
end,
|
||||
From ! {From, token_created},
|
||||
{noreply, State};
|
||||
handle_cast(_Req, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
handle_info({timeout, TRef, ?refresh_jwt}, State0 = #{refresh_timer := TRef}) ->
|
||||
State = generate_and_store_jwt(State0),
|
||||
{noreply, State};
|
||||
handle_info(_Msg, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
format_status(State) ->
|
||||
censor_secrets(State).
|
||||
|
||||
format_status(_Opt, [_PDict, State0]) ->
|
||||
State = censor_secrets(State0),
|
||||
[{data, [{"State", State}]}].
|
||||
|
||||
%%-----------------------------------------------------------------------------------------
|
||||
%% Helper fns
|
||||
%%-----------------------------------------------------------------------------------------
|
||||
|
||||
-spec do_generate_jwt(state()) -> jwt().
|
||||
do_generate_jwt(#{ expiration := ExpirationMS
|
||||
, iss := Iss
|
||||
, sub := Sub
|
||||
, aud := Aud
|
||||
, kid := KId
|
||||
, alg := Alg
|
||||
, jwk := JWK
|
||||
} = _State) ->
|
||||
Headers = #{ <<"alg">> => Alg
|
||||
, <<"kid">> => KId
|
||||
},
|
||||
Now = erlang:system_time(seconds),
|
||||
ExpirationS = erlang:convert_time_unit(ExpirationMS, millisecond, second),
|
||||
Claims = #{ <<"iss">> => Iss
|
||||
, <<"sub">> => Sub
|
||||
, <<"aud">> => Aud
|
||||
, <<"iat">> => Now
|
||||
, <<"exp">> => Now + ExpirationS
|
||||
},
|
||||
JWT0 = jose_jwt:sign(JWK, Headers, Claims),
|
||||
{_, JWT} = jose_jws:compact(JWT0),
|
||||
JWT.
|
||||
|
||||
-spec generate_and_store_jwt(state()) -> state().
|
||||
generate_and_store_jwt(State0) ->
|
||||
JWT = do_generate_jwt(State0),
|
||||
store_jwt(State0, JWT),
|
||||
?tp(rule_engine_jwt_worker_refresh, #{jwt => JWT}),
|
||||
State1 = State0#{jwt := JWT},
|
||||
ensure_timer(State1).
|
||||
|
||||
-spec store_jwt(state(), jwt()) -> ok.
|
||||
store_jwt(#{resource_id := ResourceId, table := TId}, JWT) ->
|
||||
true = ets:insert(TId, {{ResourceId, jwt}, JWT}),
|
||||
?tp(rule_engine_jwt_worker_token_stored, #{resource_id => ResourceId}),
|
||||
ok.
|
||||
|
||||
-spec ensure_timer(state()) -> state().
|
||||
ensure_timer(State = #{ refresh_timer := undefined
|
||||
, expiration := ExpirationMS0
|
||||
}) ->
|
||||
ExpirationMS = max(5_000, ExpirationMS0 - 5_000),
|
||||
TRef = erlang:start_timer(ExpirationMS, self(), ?refresh_jwt),
|
||||
State#{refresh_timer => TRef};
|
||||
ensure_timer(State) ->
|
||||
State.
|
||||
|
||||
-spec censor_secrets(state()) -> map().
|
||||
censor_secrets(State) ->
|
||||
maps:map(
|
||||
fun(Key, _Value) when Key =:= jwt;
|
||||
Key =:= jwk ->
|
||||
"******";
|
||||
(_Key, Value) ->
|
||||
Value
|
||||
end,
|
||||
State).
|
|
@ -22,7 +22,9 @@
|
|||
|
||||
-export([start_link/0]).
|
||||
|
||||
-export([start_locker/0]).
|
||||
-export([ start_locker/0
|
||||
, start_jwt_sup/0
|
||||
]).
|
||||
|
||||
-export([init/1]).
|
||||
|
||||
|
@ -31,8 +33,12 @@ start_link() ->
|
|||
|
||||
init([]) ->
|
||||
Opts = [public, named_table, set, {read_concurrency, true}],
|
||||
_ = ets:new(?ACTION_INST_PARAMS_TAB, [{keypos, #action_instance_params.id}|Opts]),
|
||||
_ = ets:new(?RES_PARAMS_TAB, [{keypos, #resource_params.id}|Opts]),
|
||||
ensure_table(?ACTION_INST_PARAMS_TAB, [{keypos, #action_instance_params.id}|Opts]),
|
||||
ensure_table(?RES_PARAMS_TAB, [{keypos, #resource_params.id}|Opts]),
|
||||
SupFlags = #{ strategy => one_for_one
|
||||
, intensity => 10
|
||||
, period => 10
|
||||
},
|
||||
Registry = #{id => emqx_rule_registry,
|
||||
start => {emqx_rule_registry, start_link, []},
|
||||
restart => permanent,
|
||||
|
@ -51,7 +57,8 @@ init([]) ->
|
|||
shutdown => 5000,
|
||||
type => worker,
|
||||
modules => [emqx_rule_monitor]},
|
||||
{ok, {{one_for_one, 10, 10}, [Registry, Metrics, Monitor]}}.
|
||||
JWTSup = jwt_sup_child_spec(),
|
||||
{ok, {SupFlags, [Registry, Metrics, Monitor, JWTSup]}}.
|
||||
|
||||
start_locker() ->
|
||||
Locker = #{id => emqx_rule_locker,
|
||||
|
@ -61,3 +68,32 @@ start_locker() ->
|
|||
type => worker,
|
||||
modules => [emqx_rule_locker]},
|
||||
supervisor:start_child(?MODULE, Locker).
|
||||
|
||||
start_jwt_sup() ->
|
||||
JWTSup = jwt_sup_child_spec(),
|
||||
supervisor:start_child(?MODULE, JWTSup).
|
||||
|
||||
jwt_sup_child_spec() ->
|
||||
#{ id => emqx_rule_engine_jwt_sup
|
||||
, start => {emqx_rule_engine_jwt_sup, start_link, []}
|
||||
, type => supervisor
|
||||
, restart => permanent
|
||||
, shutdown => 5_000
|
||||
, modules => [emqx_rule_engine_jwt_sup]
|
||||
}.
|
||||
|
||||
ensure_table(Name, Opts) ->
|
||||
try
|
||||
case ets:whereis(name) of
|
||||
undefined ->
|
||||
_ = ets:new(Name, Opts),
|
||||
ok;
|
||||
_ ->
|
||||
ok
|
||||
end
|
||||
catch
|
||||
%% stil the table exists (somehow can happen in hot-upgrade,
|
||||
%% it seems).
|
||||
error:badarg ->
|
||||
ok
|
||||
end.
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
-behaviour(gen_server).
|
||||
|
||||
-include("rule_engine.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
|
||||
%% API functions
|
||||
-export([ start_link/0
|
||||
|
@ -222,7 +223,9 @@ inc(Id, Metric, Val) ->
|
|||
counters:add(couters_ref(Id), metrics_idx(Metric), Val);
|
||||
Ref ->
|
||||
counters:add(Ref, metrics_idx(Metric), Val)
|
||||
end.
|
||||
end,
|
||||
?tp(rule_metrics_inc, #{id => Id, metric => Metric, value => Val}),
|
||||
ok.
|
||||
|
||||
inc_actions_taken(Id) ->
|
||||
inc_actions_taken(Id, 1).
|
||||
|
|
|
@ -0,0 +1,248 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_rule_engine_jwt_worker_SUITE).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
-include_lib("emqx_rule_engine/include/rule_engine.hrl").
|
||||
-include_lib("jose/include/jose_jwt.hrl").
|
||||
-include_lib("jose/include/jose_jws.hrl").
|
||||
|
||||
-compile([export_all, nowarn_export_all]).
|
||||
|
||||
%%-----------------------------------------------------------------------------
|
||||
%% CT boilerplate
|
||||
%%-----------------------------------------------------------------------------
|
||||
|
||||
all() ->
|
||||
emqx_ct:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
ok.
|
||||
|
||||
%%-----------------------------------------------------------------------------
|
||||
%% Helper fns
|
||||
%%-----------------------------------------------------------------------------
|
||||
|
||||
generate_private_key_pem() ->
|
||||
PublicExponent = 65537,
|
||||
Size = 2048,
|
||||
Key = public_key:generate_key({rsa, Size, PublicExponent}),
|
||||
DERKey = public_key:der_encode('PrivateKeyInfo', Key),
|
||||
public_key:pem_encode([{'PrivateKeyInfo', DERKey, not_encrypted}]).
|
||||
|
||||
generate_config() ->
|
||||
PrivateKeyPEM = generate_private_key_pem(),
|
||||
ResourceID = emqx_guid:gen(),
|
||||
#{ private_key => PrivateKeyPEM
|
||||
, expiration => timer:hours(1)
|
||||
, resource_id => ResourceID
|
||||
, table => ets:new(test_jwt_table, [ordered_set, public])
|
||||
, iss => <<"issuer">>
|
||||
, sub => <<"subject">>
|
||||
, aud => <<"audience">>
|
||||
, kid => <<"key id">>
|
||||
, alg => <<"RS256">>
|
||||
}.
|
||||
|
||||
is_expired(JWT) ->
|
||||
#jose_jwt{fields = #{<<"exp">> := Exp}} = jose_jwt:peek(JWT),
|
||||
Now = erlang:system_time(seconds),
|
||||
Now >= Exp.
|
||||
|
||||
%%-----------------------------------------------------------------------------
|
||||
%% Test cases
|
||||
%%-----------------------------------------------------------------------------
|
||||
|
||||
t_create_success(_Config) ->
|
||||
Config = generate_config(),
|
||||
Res = emqx_rule_engine_jwt_worker:start_link(Config),
|
||||
?assertMatch({ok, _}, Res),
|
||||
{ok, Worker} = Res,
|
||||
Ref = emqx_rule_engine_jwt_worker:ensure_jwt(Worker),
|
||||
receive
|
||||
{Ref, token_created} ->
|
||||
ok
|
||||
after
|
||||
1_000 ->
|
||||
ct:fail("should have confirmed token creation; msgs: ~0p",
|
||||
[process_info(self(), messages)])
|
||||
end,
|
||||
ok.
|
||||
|
||||
t_empty_key(_Config) ->
|
||||
Config0 = generate_config(),
|
||||
Config = Config0#{private_key := <<>>},
|
||||
process_flag(trap_exit, true),
|
||||
?check_trace(
|
||||
?wait_async_action(
|
||||
?assertMatch({ok, _}, emqx_rule_engine_jwt_worker:start_link(Config)),
|
||||
#{?snk_kind := rule_engine_jwt_worker_startup_error},
|
||||
1_000),
|
||||
fun(Trace) ->
|
||||
?assertMatch([#{error := empty_key}],
|
||||
?of_kind(rule_engine_jwt_worker_startup_error, Trace)),
|
||||
ok
|
||||
end),
|
||||
ok.
|
||||
|
||||
t_invalid_pem(_Config) ->
|
||||
Config0 = generate_config(),
|
||||
InvalidPEM = public_key:pem_encode([{'PrivateKeyInfo', <<"xxxxxx">>, not_encrypted},
|
||||
{'PrivateKeyInfo', <<"xxxxxx">>, not_encrypted}]),
|
||||
Config = Config0#{private_key := InvalidPEM},
|
||||
process_flag(trap_exit, true),
|
||||
?check_trace(
|
||||
?wait_async_action(
|
||||
?assertMatch({ok, _}, emqx_rule_engine_jwt_worker:start_link(Config)),
|
||||
#{?snk_kind := rule_engine_jwt_worker_startup_error},
|
||||
1_000),
|
||||
fun(Trace) ->
|
||||
?assertMatch([#{error := {invalid_private_key, _}}],
|
||||
?of_kind(rule_engine_jwt_worker_startup_error, Trace)),
|
||||
ok
|
||||
end),
|
||||
ok.
|
||||
|
||||
t_refresh(_Config) ->
|
||||
Config0 = #{ table := Table
|
||||
, resource_id := ResourceId
|
||||
} = generate_config(),
|
||||
Config = Config0#{expiration => 5_000},
|
||||
?check_trace(
|
||||
begin
|
||||
{{ok, _Pid}, {ok, _Event}} =
|
||||
?wait_async_action(
|
||||
emqx_rule_engine_jwt_worker:start_link(Config),
|
||||
#{?snk_kind := rule_engine_jwt_worker_token_stored},
|
||||
5_000),
|
||||
{ok, FirstJWT} = emqx_rule_engine_jwt:lookup_jwt(Table, ResourceId),
|
||||
?block_until(#{?snk_kind := rule_engine_jwt_worker_refresh,
|
||||
jwt := JWT0} when JWT0 =/= FirstJWT, 15_000),
|
||||
{ok, SecondJWT} = emqx_rule_engine_jwt:lookup_jwt(Table, ResourceId),
|
||||
?assertNot(is_expired(SecondJWT)),
|
||||
?assert(is_expired(FirstJWT)),
|
||||
{FirstJWT, SecondJWT}
|
||||
end,
|
||||
fun({FirstJWT, SecondJWT}, Trace) ->
|
||||
?assertMatch([_, _ | _],
|
||||
?of_kind(rule_engine_jwt_worker_token_stored, Trace)),
|
||||
?assertNotEqual(FirstJWT, SecondJWT),
|
||||
ok
|
||||
end),
|
||||
ok.
|
||||
|
||||
t_format_status(_Config) ->
|
||||
Config = generate_config(),
|
||||
{ok, Pid} = emqx_rule_engine_jwt_worker:start_link(Config),
|
||||
{status, _, _, Props} = sys:get_status(Pid),
|
||||
[State] = [State
|
||||
|| Info = [_ | _] <- Props,
|
||||
{data, Data = [_ | _]} <- Info,
|
||||
{"State", State} <- Data],
|
||||
?assertMatch(
|
||||
#{ jwt := "******"
|
||||
, jwk := "******"
|
||||
},
|
||||
State),
|
||||
ok.
|
||||
|
||||
t_lookup_ok(_Config) ->
|
||||
Config = #{ table := Table
|
||||
, resource_id := ResourceId
|
||||
, private_key := PrivateKeyPEM
|
||||
, aud := Aud
|
||||
, iss := Iss
|
||||
, sub := Sub
|
||||
, kid := KId
|
||||
} = generate_config(),
|
||||
{ok, Worker} = emqx_rule_engine_jwt_worker:start_link(Config),
|
||||
Ref = emqx_rule_engine_jwt_worker:ensure_jwt(Worker),
|
||||
receive
|
||||
{Ref, token_created} ->
|
||||
ok
|
||||
after
|
||||
500 ->
|
||||
error(timeout)
|
||||
end,
|
||||
Res = emqx_rule_engine_jwt:lookup_jwt(Table, ResourceId),
|
||||
?assertMatch({ok, _}, Res),
|
||||
{ok, JWT} = Res,
|
||||
?assert(is_binary(JWT)),
|
||||
JWK = jose_jwk:from_pem(PrivateKeyPEM),
|
||||
{IsValid, ParsedJWT, JWS} = jose_jwt:verify_strict(JWK, [<<"RS256">>], JWT),
|
||||
?assertMatch(
|
||||
#jose_jwt{
|
||||
fields = #{ <<"aud">> := Aud
|
||||
, <<"iss">> := Iss
|
||||
, <<"sub">> := Sub
|
||||
, <<"exp">> := _
|
||||
, <<"iat">> := _
|
||||
}},
|
||||
ParsedJWT),
|
||||
?assertNot(is_expired(JWT)),
|
||||
?assertMatch(
|
||||
#jose_jws{
|
||||
alg = {_, 'RS256'},
|
||||
fields = #{ <<"kid">> := KId
|
||||
, <<"typ">> := <<"JWT">>
|
||||
}},
|
||||
JWS),
|
||||
?assert(IsValid),
|
||||
ok.
|
||||
|
||||
t_lookup_not_found(_Config) ->
|
||||
Table = ets:new(test_jwt_table, [ordered_set, public]),
|
||||
InexistentResource = <<"xxx">>,
|
||||
?assertEqual({error, not_found},
|
||||
emqx_rule_engine_jwt:lookup_jwt(Table, InexistentResource)),
|
||||
ok.
|
||||
|
||||
t_lookup_badarg(_Config) ->
|
||||
InexistentTable = i_dont_exist,
|
||||
InexistentResource = <<"xxx">>,
|
||||
?assertEqual({error, not_found},
|
||||
emqx_rule_engine_jwt:lookup_jwt(InexistentTable, InexistentResource)),
|
||||
ok.
|
||||
|
||||
t_start_supervised_worker(_Config) ->
|
||||
{ok, _} = emqx_rule_engine_jwt_sup:start_link(),
|
||||
Config = #{resource_id := ResourceId} = generate_config(),
|
||||
{ok, Pid} = emqx_rule_engine_jwt_sup:ensure_worker_present(ResourceId, Config),
|
||||
Ref = emqx_rule_engine_jwt_worker:ensure_jwt(Pid),
|
||||
receive
|
||||
{Ref, token_created} ->
|
||||
ok
|
||||
after
|
||||
5_000 ->
|
||||
ct:fail("timeout")
|
||||
end,
|
||||
MRef = monitor(process, Pid),
|
||||
?assert(is_process_alive(Pid)),
|
||||
ok = emqx_rule_engine_jwt_sup:ensure_worker_deleted(ResourceId),
|
||||
receive
|
||||
{'DOWN', MRef, process, Pid, _} ->
|
||||
ok
|
||||
after
|
||||
1_000 ->
|
||||
ct:fail("timeout")
|
||||
end,
|
||||
ok.
|
|
@ -136,16 +136,22 @@ t_preproc_sql5(_) ->
|
|||
emqx_rule_utils:proc_cql_param_str(ParamsTokens, Selected)).
|
||||
|
||||
t_if_contains_placeholder(_) ->
|
||||
?assert(emqx_rule_utils:if_contains_placeholder(<<"${a}">>)),
|
||||
?assert(emqx_rule_utils:if_contains_placeholder(<<"${a}${b}">>)),
|
||||
?assert(emqx_rule_utils:if_contains_placeholder(<<"${a},${b},${c}">>)),
|
||||
?assert(emqx_rule_utils:if_contains_placeholder(<<"a:${a}">>)),
|
||||
?assert(emqx_rule_utils:if_contains_placeholder(<<"a:${a},b:${b}">>)),
|
||||
?assert(emqx_rule_utils:if_contains_placeholder(<<"abc${ab}">>)),
|
||||
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"a">>)),
|
||||
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"abc$">>)),
|
||||
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"abc${">>)),
|
||||
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"abc${a">>)),
|
||||
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"abc${ab">>)),
|
||||
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"a${ab${c${e">>)),
|
||||
ok.
|
||||
TestTab =
|
||||
[ {true, "${a}"}
|
||||
, {true, "${a}${b}"}
|
||||
, {true, "${a},${b},${c}"}
|
||||
, {true, "a:${a}"}
|
||||
, {true, "a:${a},b:${b}"}
|
||||
, {true, "abc${ab}"}
|
||||
, {true, "a${ab${c}${e"}
|
||||
, {false, "a"}
|
||||
, {false, "abc$"}
|
||||
, {false, "abc${"}
|
||||
, {false, "abc${a"}
|
||||
, {false, "abc${ab"}
|
||||
, {false, "a${ab${c${e"}
|
||||
],
|
||||
lists:foreach(fun({Expected, InputStr}) ->
|
||||
?assert(Expected =:= emqx_rule_utils:if_contains_placeholder(InputStr)),
|
||||
?assert(Expected =:= emqx_rule_utils:if_contains_placeholder(iolist_to_binary(InputStr)))
|
||||
end, TestTab).
|
||||
|
|
|
@ -56,7 +56,10 @@ groups() ->
|
|||
{rulesql_select_events, [],
|
||||
[ t_sqlparse_event_client_connected_01
|
||||
, t_sqlparse_event_client_connected_02
|
||||
, t_sqlparse_event_client_disconnected
|
||||
, t_sqlparse_event_client_disconnected_normal
|
||||
, t_sqlparse_event_client_disconnected_kicked
|
||||
, t_sqlparse_event_client_disconnected_discarded
|
||||
, t_sqlparse_event_client_disconnected_takeovered
|
||||
, t_sqlparse_event_session_subscribed
|
||||
, t_sqlparse_event_session_unsubscribed
|
||||
, t_sqlparse_event_message_delivered
|
||||
|
@ -145,6 +148,12 @@ end_per_group(_Groupname, _Config) ->
|
|||
%% Testcase specific setup/teardown
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
init_per_testcase(t_sqlparse_event_client_disconnected_discarded, Config) ->
|
||||
application:set_env(emqx, client_disconnect_discarded, true),
|
||||
Config;
|
||||
init_per_testcase(t_sqlparse_event_client_disconnected_takeovered, Config) ->
|
||||
application:set_env(emqx, client_disconnect_takeovered, true),
|
||||
Config;
|
||||
init_per_testcase(_TestCase, Config) ->
|
||||
init_events_counters(),
|
||||
ok = emqx_rule_registry:register_resource_types(
|
||||
|
@ -152,6 +161,12 @@ init_per_testcase(_TestCase, Config) ->
|
|||
%ct:pal("============ ~p", [ets:tab2list(emqx_resource_type)]),
|
||||
Config.
|
||||
|
||||
end_per_testcase(t_sqlparse_event_client_disconnected_takeovered, Config) ->
|
||||
application:set_env(emqx, client_disconnect_takeovered, false), %% back to default
|
||||
Config;
|
||||
end_per_testcase(t_sqlparse_event_client_disconnected_discarded, Config) ->
|
||||
application:set_env(emqx, client_disconnect_discarded, false), %% back to default
|
||||
Config;
|
||||
end_per_testcase(_TestCase, _Config) ->
|
||||
ok.
|
||||
|
||||
|
@ -523,9 +538,118 @@ t_sqlparse_event_client_connected_02(_Config) ->
|
|||
emqx_rule_registry:remove_rule(TopicRule).
|
||||
|
||||
%% FROM $events/client_disconnected
|
||||
t_sqlparse_event_client_disconnected(_Config) ->
|
||||
%% TODO
|
||||
ok.
|
||||
t_sqlparse_event_client_disconnected_normal(_Config) ->
|
||||
ok = emqx_rule_engine:load_providers(),
|
||||
Sql = "select * "
|
||||
"from \"$events/client_disconnected\" ",
|
||||
RepubT = <<"repub/to/disconnected/normal">>,
|
||||
|
||||
TopicRule = create_simple_repub_rule(RepubT, Sql, <<>>),
|
||||
|
||||
{ok, Client} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]),
|
||||
{ok, _} = emqtt:connect(Client),
|
||||
{ok, _, _} = emqtt:subscribe(Client, RepubT, 0),
|
||||
ct:sleep(200),
|
||||
{ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]),
|
||||
{ok, _} = emqtt:connect(Client1),
|
||||
emqtt:disconnect(Client1),
|
||||
|
||||
receive {publish, #{topic := T, payload := Payload}} ->
|
||||
?assertEqual(RepubT, T),
|
||||
?assertMatch(#{<<"reason">> := <<"normal">>}, emqx_json:decode(Payload, [return_maps]))
|
||||
after 1000 ->
|
||||
ct:fail(wait_for_repub_disconnected_normal)
|
||||
end,
|
||||
emqtt:stop(Client),
|
||||
|
||||
emqx_rule_registry:remove_rule(TopicRule).
|
||||
|
||||
t_sqlparse_event_client_disconnected_kicked(_Config) ->
|
||||
ok = emqx_rule_engine:load_providers(),
|
||||
Sql = "select * "
|
||||
"from \"$events/client_disconnected\" ",
|
||||
RepubT = <<"repub/to/disconnected/kicked">>,
|
||||
|
||||
TopicRule = create_simple_repub_rule(RepubT, Sql, <<>>),
|
||||
|
||||
{ok, ClientRecvRepub} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]),
|
||||
{ok, _} = emqtt:connect(ClientRecvRepub),
|
||||
{ok, _, _} = emqtt:subscribe(ClientRecvRepub, RepubT, 0),
|
||||
ct:sleep(200),
|
||||
|
||||
{ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]),
|
||||
{ok, _} = emqtt:connect(Client1),
|
||||
emqx_cm:kick_session(<<"emqx">>),
|
||||
unlink(Client1), %% the process will receive {'EXIT',{shutdown,tcp_closed}}
|
||||
|
||||
receive {publish, #{topic := T, payload := Payload}} ->
|
||||
?assertEqual(RepubT, T),
|
||||
?assertMatch(#{<<"reason">> := <<"kicked">>}, emqx_json:decode(Payload, [return_maps]))
|
||||
after 1000 ->
|
||||
ct:fail(wait_for_repub_disconnected_kicked)
|
||||
end,
|
||||
emqtt:stop(ClientRecvRepub),
|
||||
emqx_rule_registry:remove_rule(TopicRule).
|
||||
|
||||
t_sqlparse_event_client_disconnected_discarded(_Config) ->
|
||||
ok = emqx_rule_engine:load_providers(),
|
||||
Sql = "select * "
|
||||
"from \"$events/client_disconnected\" ",
|
||||
RepubT = <<"repub/to/disconnected/discarded">>,
|
||||
|
||||
TopicRule = create_simple_repub_rule(RepubT, Sql, <<>>),
|
||||
|
||||
{ok, ClientRecvRepub} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]),
|
||||
{ok, _} = emqtt:connect(ClientRecvRepub),
|
||||
{ok, _, _} = emqtt:subscribe(ClientRecvRepub, RepubT, 0),
|
||||
ct:sleep(200),
|
||||
|
||||
{ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]),
|
||||
{ok, _} = emqtt:connect(Client1),
|
||||
unlink(Client1), %% the process will receive {'EXIT',{shutdown,tcp_closed}}
|
||||
|
||||
{ok, Client2} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}, {clean_start, true}]),
|
||||
{ok, _} = emqtt:connect(Client2),
|
||||
|
||||
receive {publish, #{topic := T, payload := Payload}} ->
|
||||
?assertEqual(RepubT, T),
|
||||
?assertMatch(#{<<"reason">> := <<"discarded">>}, emqx_json:decode(Payload, [return_maps]))
|
||||
after 1000 ->
|
||||
ct:fail(wait_for_repub_disconnected_discarded)
|
||||
end,
|
||||
|
||||
emqtt:stop(ClientRecvRepub), emqtt:stop(Client2),
|
||||
emqx_rule_registry:remove_rule(TopicRule).
|
||||
|
||||
t_sqlparse_event_client_disconnected_takeovered(_Config) ->
|
||||
ok = emqx_rule_engine:load_providers(),
|
||||
Sql = "select * "
|
||||
"from \"$events/client_disconnected\" ",
|
||||
RepubT = <<"repub/to/disconnected/takeovered">>,
|
||||
|
||||
TopicRule = create_simple_repub_rule(RepubT, Sql, <<>>),
|
||||
|
||||
{ok, ClientRecvRepub} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]),
|
||||
{ok, _} = emqtt:connect(ClientRecvRepub),
|
||||
{ok, _, _} = emqtt:subscribe(ClientRecvRepub, RepubT, 0),
|
||||
ct:sleep(200),
|
||||
|
||||
{ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]),
|
||||
{ok, _} = emqtt:connect(Client1),
|
||||
unlink(Client1), %% the process will receive {'EXIT',{shutdown,tcp_closed}}
|
||||
|
||||
{ok, Client2} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}, {clean_start, false}]),
|
||||
{ok, _} = emqtt:connect(Client2),
|
||||
|
||||
receive {publish, #{topic := T, payload := Payload}} ->
|
||||
?assertEqual(RepubT, T),
|
||||
?assertMatch(#{<<"reason">> := <<"takeovered">>}, emqx_json:decode(Payload, [return_maps]))
|
||||
after 1000 ->
|
||||
ct:fail(wait_for_repub_disconnected_discarded)
|
||||
end,
|
||||
|
||||
emqtt:stop(ClientRecvRepub), emqtt:stop(Client2),
|
||||
emqx_rule_registry:remove_rule(TopicRule).
|
||||
|
||||
%% FROM $events/session_subscribed
|
||||
t_sqlparse_event_session_subscribed(_Config) ->
|
||||
|
|
10
build
10
build
|
@ -67,7 +67,14 @@ make_rel() {
|
|||
}
|
||||
|
||||
relup_db() {
|
||||
./scripts/relup-base-vsns.escript "$@" ./data/relup-paths.eterm
|
||||
case "$PROFILE" in
|
||||
*-ee*)
|
||||
./scripts/relup-base-vsns.escript "$@" ./data/relup-paths-ee.eterm
|
||||
;;
|
||||
*)
|
||||
./scripts/relup-base-vsns.escript "$@" ./data/relup-paths.eterm
|
||||
;;
|
||||
esac
|
||||
}
|
||||
|
||||
## unzip previous version .zip files to _build/$PROFILE/rel/emqx/releases before making relup
|
||||
|
@ -84,7 +91,6 @@ make_relup() {
|
|||
if [[ "$OTP_BASE" != "$OTP_VSN" ]]; then
|
||||
OTP_CHANGED='yes'
|
||||
fi
|
||||
OTP_BASE=$(relup_db otp-vsn-for "$PKG_VSN")
|
||||
zip_file="_upgrade_base/${PROFILE}-$(env OTP_VSN="$OTP_BASE" PKG_VSN="$BASE_VSN" ./scripts/pkg-full-vsn.sh 'vsn_exact').zip"
|
||||
if [ ! -d "$releases_dir/$BASE_VSN" ]; then
|
||||
local tmp_dir
|
||||
|
|
|
@ -36,9 +36,16 @@
|
|||
For example: `acl_order = jwt,http`, this will make sure `jwt` is always checked before `http`,
|
||||
meaning if JWT is not found (or no `acl` cliam) for a client, then the ACL check will fallback to use the HTTP backend.
|
||||
|
||||
|
||||
- Added configurations to enable more `client.disconnected` events (and counter bumps) [#9267](https://github.com/emqx/emqx/pull/9267).
|
||||
Prior to this change, the `client.disconnected` event (and counter bump) is triggered when a client
|
||||
performs a 'normal' disconnect, or is 'kicked' by system admin, but NOT triggered when a
|
||||
stale connection had to be 'discarded' (for clean session) or 'takenover' (for non-clean session).
|
||||
Now it is possible to set configs `broker.client_disconnect_discarded` and `broker.client_disconnect_takenover` to `on` to enable the event in these scenarios.
|
||||
|
||||
## Bug fixes
|
||||
|
||||
- Fix that after uploading a backup file with an UTF8 filename, HTTP API `GET /data/export` fails with status code 500 [#9224](https://github.com/emqx/emqx/pull/9224).
|
||||
- Fix that after uploading a backup file with an non-ASCII filename, HTTP API `GET /data/export` fails with status code 500 [#9224](https://github.com/emqx/emqx/pull/9224).
|
||||
|
||||
- Improve the display of rule's 'Maximum Speed' counter to only reserve 2 decimal places [#9185](https://github.com/emqx/emqx/pull/9185).
|
||||
This is to avoid displaying floats like `0.30000000000000004` on the dashboard.
|
||||
|
@ -62,3 +69,5 @@
|
|||
- Make sure Rule-Engine API supports Percent-encoding `rule_id` and `resource_id` in HTTP request path [#9190](https://github.com/emqx/emqx/pull/9190).
|
||||
Note that the `id` in `POST /api/v4/rules` should be literals (not encoded) when creating a `rule` or `resource`.
|
||||
See docs [Create Rule](https://www.emqx.io/docs/zh/v4.3/advanced/http-api.html#post-api-v4-rules) [Create Resource](https://www.emqx.io/docs/zh/v4.3/advanced/http-api.html#post-api-v4-resources).
|
||||
|
||||
- When republishing messages or bridge messages to other brokers, check the validity of the topic and make sure it does not have topic wildcards [#9291](https://github.com/emqx/emqx/pull/9291).
|
||||
|
|
|
@ -32,9 +32,14 @@
|
|||
例如,`acl_order = jwt,http`,可以用于保证 `jwt` 这个模块总是排在 `http` 的前面,
|
||||
也就是说,在对客户端进行 ACL 检查时,如果 JWT 不存在(或者没有定义 ACL),那么回退到使用 HTTP。
|
||||
|
||||
- 为更多类型的 `client.disconnected` 事件(计数器触发)提供可配置项 [#9267](https://github.com/emqx/emqx/pull/9267)。
|
||||
此前,`client.disconnected` 事件及计数器仅会在客户端正常断开连接或客户端被系统管理员踢出时触发,
|
||||
但不会在旧 session 被废弃 (clean_session = true) 或旧 session 被接管 (clean_session = false) 时被触发。
|
||||
可将 `broker.client_disconnect_discarded` 和 `broker.client_disconnect_takovered` 选项设置为 `on` 来启用此场景下的客户端断连事件。
|
||||
|
||||
## 修复
|
||||
|
||||
- 修复若上传的备份文件名中包含 UTF8 字符,`GET /data/export` HTTP 接口返回 500 错误 [#9224](https://github.com/emqx/emqx/pull/9224)。
|
||||
- 修复若上传的备份文件名中包含非 ASCII 字符,`GET /data/export` HTTP 接口返回 500 错误 [#9224](https://github.com/emqx/emqx/pull/9224)。
|
||||
|
||||
- 改进规则的 "最大执行速度" 的计数,只保留小数点之后 2 位 [#9185](https://github.com/emqx/emqx/pull/9185)。
|
||||
避免在 dashboard 上展示类似这样的浮点数:`0.30000000000000004`。
|
||||
|
@ -58,3 +63,5 @@
|
|||
- 使规则引擎 API 在 HTTP 请求路径中支持百分号编码的 `rule_id` 及 `resource_id` [#9190](https://github.com/emqx/emqx/pull/9190)。
|
||||
注意在创建规则或资源时,HTTP body 中的 `id` 字段仍为字面值,而不是编码之后的值。
|
||||
详情请参考 [创建规则](https://www.emqx.io/docs/zh/v4.3/advanced/http-api.html#post-api-v4-rules) 和 [创建资源](https://www.emqx.io/docs/zh/v4.3/advanced/http-api.html#post-api-v4-resources)。
|
||||
|
||||
- 在进行消息重发布或桥接消息到其他 mqtt broker 时,检查 topic 合法性,确定其不带有主题通配符 [#9291](https://github.com/emqx/emqx/pull/9291)。
|
||||
|
|
|
@ -18,6 +18,10 @@
|
|||
- Added support for specifying custom modules for custom authentication [#9297](https://github.com/emqx/emqx/pull/9297).
|
||||
To support simple authentication rules, it is no longer necessary to implement a full-blown plugin.
|
||||
|
||||
- Added a JWT management for Rule-Engin, for creating and refreshing JWT tokens in rule engine actions [#9241](https://github.com/emqx/emqx/pull/9241).
|
||||
This feature is so far only used in EMQX Enterprise Google PubSub integration.
|
||||
Can be used as webhook integration's JWT authenticationa against the webhook service endpoint.
|
||||
|
||||
### Bug fixes
|
||||
|
||||
- Fix get trace list crash when trace not initialize. [#9156](https://github.com/emqx/emqx/pull/9156)
|
||||
|
|
|
@ -15,6 +15,10 @@
|
|||
- 增加了可定制的认证回调模块 [#9297](https://github.com/emqx/emqx/pull/9297)。
|
||||
对于一些简单的认证检查,不需要去实现一个完整的认证插件。
|
||||
|
||||
- 为规则引擎增加了一个 JWT 令牌管理,用于在规则引擎动作中创建和刷新 JWT 令牌 [#9241](https://github.com/emqx/emqx/pull/9241)。
|
||||
该功能现在仅用于 EMQX 企业版的 Google PubSub 集成中。
|
||||
后续会用于 webhook 集成的 JWT 认证。
|
||||
|
||||
### 修复
|
||||
|
||||
- 修复日志追踪模块没开启时,GET Trace 列表接口报错的问题。[#9156](https://github.com/emqx/emqx/pull/9156)
|
||||
|
|
|
@ -2504,6 +2504,16 @@ broker.route_batch_clean = off
|
|||
## - false: disable trie path compaction
|
||||
# broker.perf.trie_compaction = true
|
||||
|
||||
## Enable client disconnect event will be triggered by which reasons.
|
||||
## Value: on | off
|
||||
## `takeover`: session was takenover by another client with same client ID. (clean_session = false)
|
||||
## Default: off
|
||||
## `discard`: session was takeover by another client with same client ID. (clean_session = true)
|
||||
## Default: off
|
||||
##
|
||||
# broker.client_disconnect_discarded = off
|
||||
# broker.client_disconnect_takeovered = off
|
||||
|
||||
## CONFIG_SECTION_BGN=sys_mon ==================================================
|
||||
|
||||
## Enable Long GC monitoring. Disable if the value is 0.
|
||||
|
|
|
@ -23,6 +23,18 @@
|
|||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
%% do not start the application
|
||||
%% only testing the root supervisor in this suite
|
||||
application:stop(emqx_modules),
|
||||
{ok, Pid} = emqx_mod_sup:start_link(),
|
||||
unlink(Pid),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
exit(whereis(emqx_mod_sup), kill),
|
||||
ok.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Test cases
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -2669,6 +2669,20 @@ end}.
|
|||
{datatype, {enum, [true, false]}}
|
||||
]}.
|
||||
|
||||
%% @doc Configuration of disconnected event reason.
|
||||
%% `takeover`: session was takenover by another client with same client ID. (clean_session = false)
|
||||
%% `discard`: session was takeover by another client with same client ID. (clean_session = true)
|
||||
{mapping, "broker.client_disconnect_discarded", "emqx.client_disconnect_discarded", [
|
||||
{default, off},
|
||||
{datatype, flag}
|
||||
]}.
|
||||
|
||||
{mapping, "broker.client_disconnect_takeovered", "emqx.client_disconnect_takeovered", [
|
||||
{default, off},
|
||||
{datatype, flag}
|
||||
]}.
|
||||
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% System Monitor
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -5,6 +5,29 @@ set -euo pipefail
|
|||
# ensure dir
|
||||
cd -P -- "$(dirname -- "$0")/.."
|
||||
|
||||
help() {
|
||||
echo
|
||||
echo "-h|--help: To display this usage info"
|
||||
echo "--json: Print apps in json"
|
||||
}
|
||||
|
||||
WANT_JSON='no'
|
||||
while [ "$#" -gt 0 ]; do
|
||||
case $1 in
|
||||
-h|--help)
|
||||
help
|
||||
exit 0
|
||||
;;
|
||||
--json)
|
||||
WANT_JSON='yes'
|
||||
shift 1
|
||||
;;
|
||||
*)
|
||||
echo "unknown option $1"
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
done
|
||||
if [ "$(./scripts/get-distro.sh)" = 'windows' ]; then
|
||||
# Otherwise windows may resolve to find.exe
|
||||
FIND="/usr/bin/find"
|
||||
|
@ -17,17 +40,26 @@ find_app() {
|
|||
"$FIND" "${appdir}" -mindepth 1 -maxdepth 1 -type d
|
||||
}
|
||||
|
||||
# append emqx application first
|
||||
echo 'emqx'
|
||||
EM="emqx"
|
||||
CE="$(find_app 'apps')"
|
||||
|
||||
find_app 'apps'
|
||||
if [ -f 'EMQX_ENTERPRISE' ]; then
|
||||
find_app 'lib-ee'
|
||||
LIB="$(find_app 'lib-ee')"
|
||||
else
|
||||
find_app 'lib-ce'
|
||||
LIB="$(find_app 'lib-ce')"
|
||||
fi
|
||||
|
||||
## find directories in lib-extra
|
||||
find_app 'lib-extra'
|
||||
LIBE="$(find_app 'lib-extra')"
|
||||
|
||||
## find symlinks in lib-extra
|
||||
"$FIND" 'lib-extra' -mindepth 1 -maxdepth 1 -type l -exec test -e {} \; -print
|
||||
LIBES="$("$FIND" 'lib-extra' -mindepth 1 -maxdepth 1 -type l -exec test -e {} \; -print)"
|
||||
|
||||
APPS_ALL="$(echo -e "${EM}\n${CE}\n${LIB}\n${LIBE}\n${LIBES}")"
|
||||
|
||||
if [ "$WANT_JSON" = 'yes' ]; then
|
||||
echo "${APPS_ALL}" | xargs | tr -d '\n' | jq -R -s -c 'split(" ")'
|
||||
else
|
||||
echo "${APPS_ALL}"
|
||||
fi
|
||||
|
||||
|
|
|
@ -998,7 +998,13 @@ handle_call(discard, Channel) ->
|
|||
disconnect_and_shutdown(discarded, ok, Channel);
|
||||
|
||||
%% Session Takeover
|
||||
handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) ->
|
||||
handle_call({takeover, 'begin'}, Channel = #channel{
|
||||
session = Session,
|
||||
conninfo = #{clientid := ClientId}
|
||||
}) ->
|
||||
?tp(debug,
|
||||
emqx_channel_takeover_begin,
|
||||
#{clientid => ClientId}),
|
||||
reply(Session, Channel#channel{takeover = true});
|
||||
|
||||
handle_call({takeover, 'end'}, Channel = #channel{session = Session,
|
||||
|
@ -1748,7 +1754,16 @@ parse_topic_filters(TopicFilters) ->
|
|||
lists:map(fun emqx_topic:parse/1, TopicFilters).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Ensure disconnected
|
||||
%% Maybe & Ensure disconnected
|
||||
|
||||
ensure_disconnected(connected, Reason, Channel)
|
||||
when Reason =:= discarded orelse Reason =:= takeovered ->
|
||||
case is_disconnect_event_enabled(Reason) of
|
||||
true -> ensure_disconnected(Reason, Channel);
|
||||
false -> Channel
|
||||
end;
|
||||
ensure_disconnected(_, _, Channel) ->
|
||||
Channel.
|
||||
|
||||
ensure_disconnected(Reason, Channel = #channel{conninfo = ConnInfo,
|
||||
clientinfo = ClientInfo}) ->
|
||||
|
@ -1845,12 +1860,15 @@ shutdown(Reason, Reply, Channel) ->
|
|||
shutdown(Reason, Reply, Packet, Channel) ->
|
||||
{shutdown, Reason, Reply, Packet, Channel}.
|
||||
|
||||
%% mqtt v5 connected sessions
|
||||
disconnect_and_shutdown(Reason, Reply, Channel = ?IS_MQTT_V5
|
||||
= #channel{conn_state = connected}) ->
|
||||
shutdown(Reason, Reply, ?DISCONNECT_PACKET(reason_code(Reason)), Channel);
|
||||
|
||||
disconnect_and_shutdown(Reason, Reply, Channel) ->
|
||||
shutdown(Reason, Reply, Channel).
|
||||
NChannel = ensure_disconnected(connected, Reason, Channel),
|
||||
shutdown(Reason, Reply, ?DISCONNECT_PACKET(reason_code(Reason)), NChannel);
|
||||
%% mqtt v3/v4 sessions, mqtt v5 other conn_state sessions
|
||||
disconnect_and_shutdown(Reason, Reply, Channel= #channel{conn_state = ConnState}) ->
|
||||
NChannel = ensure_disconnected(ConnState, Reason, Channel),
|
||||
shutdown(Reason, Reply, NChannel).
|
||||
|
||||
sp(true) -> 1;
|
||||
sp(false) -> 0.
|
||||
|
@ -1858,6 +1876,11 @@ sp(false) -> 0.
|
|||
flag(true) -> 1;
|
||||
flag(false) -> 0.
|
||||
|
||||
is_disconnect_event_enabled(discarded) ->
|
||||
emqx:get_env(client_disconnect_discarded, false);
|
||||
is_disconnect_event_enabled(takeovered) ->
|
||||
emqx:get_env(client_disconnect_takeovered, false).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% For CT tests
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -825,15 +825,18 @@ t_enrich_connack_caps(_) ->
|
|||
wildcard_subscription => true
|
||||
}
|
||||
end),
|
||||
AckProps = emqx_channel:enrich_connack_caps(#{}, channel()),
|
||||
?assertMatch(#{'Retain-Available' := 1,
|
||||
'Maximum-Packet-Size' := 1024,
|
||||
'Topic-Alias-Maximum' := 10,
|
||||
'Wildcard-Subscription-Available' := 1,
|
||||
'Subscription-Identifier-Available' := 1,
|
||||
'Shared-Subscription-Available' := 1
|
||||
}, AckProps),
|
||||
ok = meck:unload(emqx_mqtt_caps).
|
||||
try
|
||||
AckProps = emqx_channel:enrich_connack_caps(#{}, channel()),
|
||||
?assertMatch(#{'Retain-Available' := 1,
|
||||
'Maximum-Packet-Size' := 1024,
|
||||
'Topic-Alias-Maximum' := 10,
|
||||
'Wildcard-Subscription-Available' := 1,
|
||||
'Subscription-Identifier-Available' := 1,
|
||||
'Shared-Subscription-Available' := 1
|
||||
}, AckProps)
|
||||
after
|
||||
ok = meck:unload(emqx_mqtt_caps)
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Test cases for terminate
|
||||
|
|
|
@ -52,13 +52,15 @@ t_check_sub(_) ->
|
|||
wildcard_subscription => false
|
||||
},
|
||||
emqx_zone:set_env(zone, '$mqtt_sub_caps', SubCaps),
|
||||
timer:sleep(50),
|
||||
ClientInfo = #{zone => zone},
|
||||
ok = emqx_mqtt_caps:check_sub(ClientInfo, <<"topic">>, SubOpts),
|
||||
?assertEqual({error, ?RC_TOPIC_FILTER_INVALID},
|
||||
emqx_mqtt_caps:check_sub(ClientInfo, <<"a/b/c/d">>, SubOpts)),
|
||||
?assertEqual({error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED},
|
||||
emqx_mqtt_caps:check_sub(ClientInfo, <<"+/#">>, SubOpts)),
|
||||
?assertEqual({error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED},
|
||||
emqx_mqtt_caps:check_sub(ClientInfo, <<"topic">>, SubOpts#{share => true})),
|
||||
emqx_zone:unset_env(zone, '$mqtt_pub_caps').
|
||||
try
|
||||
ClientInfo = #{zone => zone},
|
||||
ok = emqx_mqtt_caps:check_sub(ClientInfo, <<"topic">>, SubOpts),
|
||||
?assertEqual({error, ?RC_TOPIC_FILTER_INVALID},
|
||||
emqx_mqtt_caps:check_sub(ClientInfo, <<"a/b/c/d">>, SubOpts)),
|
||||
?assertEqual({error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED},
|
||||
emqx_mqtt_caps:check_sub(ClientInfo, <<"+/#">>, SubOpts)),
|
||||
?assertEqual({error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED},
|
||||
emqx_mqtt_caps:check_sub(ClientInfo, <<"topic">>, SubOpts#{share => true}))
|
||||
after
|
||||
emqx_zone:unset_env(zone, '$mqtt_pub_caps')
|
||||
end.
|
||||
|
|
|
@ -468,8 +468,8 @@ t_connack_max_qos_allowed(_) ->
|
|||
|
||||
%% max_qos_allowed = 0
|
||||
emqx_zone:set_env(external, max_qos_allowed, 0),
|
||||
persistent_term:erase({emqx_zone, external, '$mqtt_caps'}),
|
||||
persistent_term:erase({emqx_zone, external, '$mqtt_pub_caps'}),
|
||||
emqx_zone:unset_env(external, '$mqtt_caps'),
|
||||
emqx_zone:unset_env(external, '$mqtt_pub_caps'),
|
||||
|
||||
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
|
||||
{ok, Connack1} = emqtt:connect(Client1),
|
||||
|
@ -496,8 +496,8 @@ t_connack_max_qos_allowed(_) ->
|
|||
|
||||
%% max_qos_allowed = 1
|
||||
emqx_zone:set_env(external, max_qos_allowed, 1),
|
||||
persistent_term:erase({emqx_zone, external, '$mqtt_caps'}),
|
||||
persistent_term:erase({emqx_zone, external, '$mqtt_pub_caps'}),
|
||||
emqx_zone:unset_env(external, '$mqtt_caps'),
|
||||
emqx_zone:unset_env(external, '$mqtt_pub_caps'),
|
||||
|
||||
{ok, Client3} = emqtt:start_link([{proto_ver, v5}]),
|
||||
{ok, Connack3} = emqtt:connect(Client3),
|
||||
|
@ -524,8 +524,8 @@ t_connack_max_qos_allowed(_) ->
|
|||
|
||||
%% max_qos_allowed = 2
|
||||
emqx_zone:set_env(external, max_qos_allowed, 2),
|
||||
persistent_term:erase({emqx_zone, external, '$mqtt_caps'}),
|
||||
persistent_term:erase({emqx_zone, external, '$mqtt_pub_caps'}),
|
||||
emqx_zone:unset_env(external, '$mqtt_caps'),
|
||||
emqx_zone:unset_env(external, '$mqtt_pub_caps'),
|
||||
|
||||
{ok, Client5} = emqtt:start_link([{proto_ver, v5}]),
|
||||
{ok, Connack5} = emqtt:connect(Client5),
|
||||
|
|
|
@ -31,12 +31,23 @@ start_slave(Name) ->
|
|||
start_slave(Name, #{}).
|
||||
|
||||
start_slave(Name, Opts) ->
|
||||
SlaveMod = maps:get(slave_mod, Opts, ct_slave),
|
||||
Node = make_node_name(Name),
|
||||
case ct_slave:start(Node, [{kill_if_fail, true},
|
||||
{monitor_master, true},
|
||||
{init_timeout, 10000},
|
||||
{startup_timeout, 10000},
|
||||
{erl_flags, ebin_path()}]) of
|
||||
DoStart =
|
||||
fun() ->
|
||||
case SlaveMod of
|
||||
ct_slave ->
|
||||
ct_slave:start(Node,
|
||||
[{kill_if_fail, true},
|
||||
{monitor_master, true},
|
||||
{init_timeout, 10000},
|
||||
{startup_timeout, 10000},
|
||||
{erl_flags, ebin_path()}]);
|
||||
slave ->
|
||||
slave:start_link(host(), Name, ebin_path())
|
||||
end
|
||||
end,
|
||||
case DoStart() of
|
||||
{ok, _} ->
|
||||
ok;
|
||||
{error, started_not_connected, _} ->
|
||||
|
@ -115,6 +126,9 @@ setup_node(Node, #{} = Opts) ->
|
|||
?assertEqual( node()
|
||||
, gen_rpc:call(Node, gen_rpc, call, [node(), erlang, node, []])
|
||||
),
|
||||
|
||||
ok = snabbkaffe:forward_trace(Node),
|
||||
|
||||
ok.
|
||||
|
||||
%% Routes are replicated async.
|
||||
|
|
|
@ -32,6 +32,7 @@ init_per_suite(Config) ->
|
|||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_ct_helpers:boot_modules(all),
|
||||
emqx_ct_helpers:stop_apps([]).
|
||||
|
||||
init_per_testcase(_TestCase, Config) ->
|
||||
|
|
|
@ -51,7 +51,7 @@ init_per_suite(Config) ->
|
|||
PortDiscovery = application:get_env(gen_rpc, port_discovery),
|
||||
application:set_env(gen_rpc, port_discovery, stateless),
|
||||
application:ensure_all_started(gen_rpc),
|
||||
%% ensure emqx_moduels' app modules are loaded
|
||||
%% ensure emqx_modules app modules are loaded
|
||||
%% so the mnesia tables are created
|
||||
ok = load_app(emqx_modules),
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
|
|
|
@ -32,25 +32,44 @@
|
|||
all() -> emqx_ct:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_ct_helpers:boot_modules(all),
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_ct_helpers:stop_apps([]),
|
||||
ok.
|
||||
|
||||
init_per_testcase(Case, Config) ->
|
||||
?MODULE:Case({'init', Config}).
|
||||
|
||||
end_per_testcase(Case, Config) ->
|
||||
?MODULE:Case({'end', Config}).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Testcases
|
||||
|
||||
t_takeover(_) ->
|
||||
process_flag(trap_exit, true),
|
||||
t_takeover({init, Config}) when is_list(Config) ->
|
||||
Config;
|
||||
t_takeover({'end', Config}) when is_list(Config) ->
|
||||
ok;
|
||||
t_takeover(Config) when is_list(Config) ->
|
||||
AllMsgs = messages(?CNT),
|
||||
Pos = rand:uniform(?CNT),
|
||||
|
||||
ClientId = <<"clientid">>,
|
||||
{ok, C1} = emqtt:start_link([{clientid, ClientId}, {clean_start, false}]),
|
||||
{ok, _} = emqtt:connect(C1),
|
||||
ClientId = random_clientid(),
|
||||
ClientOpts = [{clientid, ClientId},
|
||||
{clean_start, false},
|
||||
{host, "127.0.0.1"},
|
||||
{port, 1883}
|
||||
],
|
||||
C1 =
|
||||
with_retry(
|
||||
fun() ->
|
||||
{ok, C} = emqtt:start_link(ClientOpts),
|
||||
{ok, _} = emqtt:connect(C),
|
||||
C
|
||||
end, 5),
|
||||
emqtt:subscribe(C1, <<"t">>, 1),
|
||||
|
||||
spawn(fun() ->
|
||||
[begin
|
||||
emqx:publish(lists:nth(I, AllMsgs)),
|
||||
|
@ -59,31 +78,65 @@ t_takeover(_) ->
|
|||
end),
|
||||
emqtt:pause(C1),
|
||||
timer:sleep(?CNT*10),
|
||||
|
||||
load_meck(ClientId),
|
||||
spawn(fun() ->
|
||||
[begin
|
||||
emqx:publish(lists:nth(I, AllMsgs)),
|
||||
timer:sleep(rand:uniform(10))
|
||||
end || I <- lists:seq(Pos+1, ?CNT)]
|
||||
end),
|
||||
{ok, C2} = emqtt:start_link([{clientid, ClientId}, {clean_start, false}]),
|
||||
{ok, _} = emqtt:connect(C2),
|
||||
|
||||
Received = all_received_publishs(),
|
||||
ct:pal("middle: ~p, received: ~p", [Pos, [P || {publish, #{payload := P}} <- Received]]),
|
||||
assert_messages_missed(AllMsgs, Received),
|
||||
assert_messages_order(AllMsgs, Received),
|
||||
|
||||
emqtt:disconnect(C2),
|
||||
unload_meck(ClientId).
|
||||
|
||||
t_takover_in_cluster(_) ->
|
||||
todo.
|
||||
try
|
||||
spawn(fun() ->
|
||||
[begin
|
||||
emqx:publish(lists:nth(I, AllMsgs)),
|
||||
timer:sleep(rand:uniform(10))
|
||||
end || I <- lists:seq(Pos+1, ?CNT)]
|
||||
end),
|
||||
{ok, C2} = emqtt:start_link(ClientOpts),
|
||||
%% C1 is going down, unlink it so the test can continue to run
|
||||
_ = monitor(process, C1),
|
||||
?assert(erlang:is_process_alive(C1)),
|
||||
unlink(C1),
|
||||
{ok, _} = emqtt:connect(C2),
|
||||
receive
|
||||
{'DOWN', _, process, C1, _} ->
|
||||
ok
|
||||
after 1000 ->
|
||||
ct:fail("timedout_waiting_for_old_connection_shutdown")
|
||||
end,
|
||||
Received = all_received_publishs(),
|
||||
ct:pal("middle: ~p, received: ~p", [Pos, [P || {publish, #{payload := P}} <- Received]]),
|
||||
assert_messages_missed(AllMsgs, Received),
|
||||
assert_messages_order(AllMsgs, Received),
|
||||
kill_process(C2, fun emqtt:stop/1)
|
||||
after
|
||||
unload_meck(ClientId)
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Helpers
|
||||
|
||||
random_clientid() ->
|
||||
iolist_to_binary(["clientid", "-", integer_to_list(erlang:system_time())]).
|
||||
|
||||
kill_process(Pid, WithFun) ->
|
||||
_ = unlink(Pid),
|
||||
_ = monitor(process, Pid),
|
||||
try WithFun(Pid)
|
||||
catch _:_ -> ok
|
||||
end,
|
||||
receive
|
||||
{'DOWN', _, process, Pid, _} ->
|
||||
ok
|
||||
after 10_000 ->
|
||||
exit(Pid, kill),
|
||||
error(timeout)
|
||||
end.
|
||||
|
||||
with_retry(Fun, 1) -> Fun();
|
||||
with_retry(Fun, N) when N > 1 ->
|
||||
try
|
||||
Fun()
|
||||
catch
|
||||
_ : _ ->
|
||||
ct:sleep(1000),
|
||||
with_retry(Fun, N - 1)
|
||||
end.
|
||||
|
||||
load_meck(ClientId) ->
|
||||
meck:new(fake_conn_mod, [non_strict]),
|
||||
HookTakeover = fun(Pid, Msg = {takeover, 'begin'}) ->
|
||||
|
|
Loading…
Reference in New Issue