Merge branch 'release-v43' into EMQX-7665-cannot-clear-alarms

This commit is contained in:
Zaiming (Stone) Shi 2022-11-04 16:12:56 +01:00 committed by GitHub
commit 2c8874cfe0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
65 changed files with 1571 additions and 426 deletions

View File

@ -1,8 +1,4 @@
name: 'Detect profiles'
inputs:
ci_git_token:
required: true
type: string
outputs:
profiles:
description: 'Detected profiles'
@ -14,12 +10,18 @@ runs:
- id: detect-profiles
shell: bash
run: |
if make emqx-ee --dry-run > /dev/null 2>&1; then
echo "::set-output name=profiles::[\"emqx-ee\"]"
echo "https://ci%40emqx.io:${{ inputs.ci_git_token }}@github.com" > $HOME/.git-credentials
git config --global credential.helper store
if [ -d source ]; then
## source code downloaded
cd source
fi
if [ ! -d .git ]; then
echo "Not git dir, $(pwd)"
exit 1
fi
if [ -f 'EMQX_ENTERPRISE' ]; then
echo "profiles=[\"emqx-ee\"]" >> $GITHUB_OUTPUT
echo "EMQX_NAME=emqx-ee" >> $GITHUB_ENV
else
echo "::set-output name=profiles::[\"emqx\", \"emqx-edge\"]"
echo "profiles=[\"emqx\", \"emqx-edge\"]" >> $GITHUB_OUTPUT
echo "EMQX_NAME=emqx" >> $GITHUB_ENV
fi

View File

@ -27,27 +27,23 @@ runs:
shell: bash
run: |
brew update
brew install curl zip unzip gnu-sed kerl coreutils unixodbc freetds openssl@1.1
brew install curl zip unzip gnu-sed coreutils unixodbc freetds openssl@1.1
echo "/usr/local/opt/bison/bin" >> $GITHUB_PATH
echo "/usr/local/bin" >> $GITHUB_PATH
- uses: actions/cache@v2
- uses: actions/cache@v3
id: cache
with:
path: ~/.kerl/${{ inputs.otp }}
path: /opt/erlang/${{ inputs.otp }}
key: otp-install-${{ inputs.otp }}-${{ inputs.os }}-static-ssl-disable-hipe-disable-jit
restore-keys: |
otp-install-${{ inputs.otp }}-${{ inputs.os }}
- name: build erlang
if: steps.cache.outputs.cache-hit != 'true'
shell: bash
env:
KERL_BUILD_BACKEND: git
OTP_GITHUB_URL: https://github.com/emqx/otp
KERL_CONFIGURE_OPTIONS: --disable-dynamic-ssl-lib --with-ssl=/usr/local/opt/openssl@1.1 --disable-hipe --disable-jit
run: |
kerl update releases
kerl build ${{ inputs.otp }}
kerl install ${{ inputs.otp }} $HOME/.kerl/${{ inputs.otp }}
git clone --depth 1 --branch OTP-${{ inputs.otp }} https://github.com/emqx/otp.git $HOME/otp-${{ inputs.otp }}
cd $HOME/otp-${{ inputs.otp }}
./configure --disable-dynamic-ssl-lib --with-ssl=/usr/local/opt/openssl@1.1 --disable-hipe --disable-jit --prefix=/opt/erlang/${{ inputs.otp }}
make -j$(nproc)
sudo make install
- name: build
env:
AUTO_INSTALL_BUILD_DEPS: 1
@ -60,7 +56,7 @@ runs:
APPLE_DEVELOPER_ID_BUNDLE_PASSWORD: ${{ inputs.apple_developer_id_bundle_password }}
shell: bash
run: |
. $HOME/.kerl/${{ inputs.otp }}/activate
export PATH="/opt/erlang/${{ inputs.otp }}/bin:$PATH"
make ensure-rebar3
sudo cp rebar3 /usr/local/bin/rebar3
make ${EMQX_NAME}-zip

View File

@ -26,14 +26,12 @@ jobs:
profiles: ${{ steps.detect-profiles.outputs.profiles}}
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
with:
path: source
fetch-depth: 0
- id: detect-profiles
- name: detect-profiles
id: detect-profiles
uses: ./source/.github/actions/detect-profiles
with:
ci_git_token: ${{ secrets.CI_GIT_TOKEN }}
- name: get_all_deps
if: endsWith(github.repository, 'emqx')
run: |
@ -109,10 +107,10 @@ jobs:
./_build/${{ matrix.profile }}/rel/emqx/bin/emqx stop
./_build/${{ matrix.profile }}/rel/emqx/bin/emqx install
./_build/${{ matrix.profile }}/rel/emqx/bin/emqx uninstall
- uses: actions/upload-artifact@v1
- uses: actions/upload-artifact@v3
with:
name: ${{ matrix.profile }}
path: source/_packages/${{ matrix.profile }}/.
path: source/_packages/${{ matrix.profile }}/
mac:
needs: prepare
@ -126,7 +124,7 @@ jobs:
runs-on: ${{ matrix.os }}
steps:
- uses: actions/download-artifact@v2
- uses: actions/download-artifact@v3
with:
name: source
path: .
@ -135,10 +133,7 @@ jobs:
ln -s . source
unzip -q source.zip
rm source source.zip
- id: detect-profiles
uses: ./.github/actions/detect-profiles
with:
ci_git_token: ${{ secrets.CI_GIT_TOKEN }}
- uses: ./.github/actions/detect-profiles
- uses: ./.github/actions/package-macos
with:
otp: ${{ matrix.otp }}
@ -147,10 +142,10 @@ jobs:
apple_developer_identity: ${{ secrets.APPLE_DEVELOPER_IDENTITY }}
apple_developer_id_bundle: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE }}
apple_developer_id_bundle_password: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE_PASSWORD }}
- uses: actions/upload-artifact@v1
- uses: actions/upload-artifact@v3
with:
name: ${EMQX_NAME}-${{ matrix.otp }}
path: _packages/${EMQX_NAME}/.
path: _packages/${EMQX_NAME}/
linux:
runs-on: ubuntu-20.04
@ -263,7 +258,7 @@ jobs:
- uses: actions/upload-artifact@v1
with:
name: ${{ matrix.profile }}
path: /tmp/packages/${{ matrix.profile }}/.
path: /tmp/packages/${{ matrix.profile }}/
docker:
runs-on: ubuntu-20.04

View File

@ -30,16 +30,19 @@ jobs:
steps:
- uses: actions/checkout@v1
- uses: ./.github/actions/detect-profiles
with:
ci_git_token: ${{ secrets.CI_GIT_TOKEN }}
- name: fix-git-unsafe-repository
run: git config --global --add safe.directory /__w/emqx/emqx
- uses: actions/cache@v2
- uses: ./.github/actions/detect-profiles
- name: ensure access to github
if: endsWith(github.repository, 'enterprise')
run: |
echo "https://ci%40emqx.io:${{ secrets.CI_GIT_TOKEN }}@github.com" > $HOME/.git-credentials
git config --global credential.helper store
- uses: actions/cache@v3
with:
# dialyzer PLTs
path: ~/.cache/rebar3/
key: dialyer-${{ matrix.otp }}
key: dialyzer-${{ matrix.otp }}
- name: make xref
run: make xref
- name: make dialyzer
@ -48,7 +51,7 @@ jobs:
run: make ${EMQX_NAME}-zip
- name: build deb/rpm packages
run: make ${EMQX_NAME}-pkg
- uses: actions/upload-artifact@v1
- uses: actions/upload-artifact@v3
if: failure()
with:
name: rebar3.crashdump
@ -57,7 +60,7 @@ jobs:
run: |
export CODE_PATH=$GITHUB_WORKSPACE
.ci/build_packages/tests.sh
- uses: actions/upload-artifact@v2
- uses: actions/upload-artifact@v3
with:
name: ${{ matrix.os }}
path: _packages/**/*.zip
@ -71,10 +74,13 @@ jobs:
- macos-11
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v1
- uses: actions/checkout@v3
- name: ensure access to github
if: endsWith(github.repository, 'enterprise')
run: |
echo "https://ci%40emqx.io:${{ secrets.CI_GIT_TOKEN }}@github.com" > $HOME/.git-credentials
git config --global credential.helper store
- uses: ./.github/actions/detect-profiles
with:
ci_git_token: ${{ secrets.CI_GIT_TOKEN }}
- uses: ./.github/actions/package-macos
with:
otp: ${{ matrix.otp }}
@ -83,12 +89,12 @@ jobs:
apple_developer_identity: ${{ secrets.APPLE_DEVELOPER_IDENTITY }}
apple_developer_id_bundle: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE }}
apple_developer_id_bundle_password: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE_PASSWORD }}
- uses: actions/upload-artifact@v1
- uses: actions/upload-artifact@v3
if: failure()
with:
name: rebar3.crashdump
path: ./rebar3.crashdump
- uses: actions/upload-artifact@v2
- uses: actions/upload-artifact@v3
with:
name: macos
path: _packages/**/*.zip

View File

@ -13,14 +13,9 @@ jobs:
profiles: ${{ steps.detect-profiles.outputs.profiles}}
steps:
- uses: actions/checkout@v2
with:
path: source
fetch-depth: 0
- uses: actions/checkout@v3
- id: detect-profiles
uses: ./source/.github/actions/detect-profiles
with:
ci_git_token: ${{ secrets.CI_GIT_TOKEN }}
uses: ./.github/actions/detect-profiles
upload:
runs-on: ubuntu-20.04
@ -59,12 +54,10 @@ jobs:
-X POST \
-d "{\"repo\":\"emqx/emqx\", \"tag\": \"${{ github.ref_name }}\" }" \
${{ secrets.EMQX_IO_RELEASE_API }}
- uses: actions/checkout@v2
with:
fetch-depth: 0
- uses: actions/checkout@v3
- name: get version
id: version
run: echo "::set-output name=version::$(./pkg-vsn.sh)"
run: echo "version=$(./pkg-vsn.sh)" >> $GITHUB_OUTPUT
- uses: emqx/push-helm-action@v1
if: github.event_name == 'release' && endsWith(github.repository, 'emqx') && matrix.profile == 'emqx'
with:

View File

@ -10,140 +10,255 @@ on:
pull_request:
jobs:
run_proper_test:
runs-on: ubuntu-20.04
container: emqx/build-env:erl23.3.4.9-3-ubuntu20.04
prepare:
runs-on: ubuntu-20.04
container: emqx/build-env:erl23.3.4.9-3-ubuntu20.04
outputs:
ct_apps: ${{ steps.run_find_apps.outputs.ct_apps }}
steps:
- uses: actions/checkout@v3
with:
path: source
fetch-depth: 0
- name: git credentials
run: |
if make emqx-ee --dry-run > /dev/null 2>&1; then
echo "https://ci%40emqx.io:${{ secrets.CI_GIT_TOKEN }}@github.com" > $HOME/.git-credentials
git config --global credential.helper store
fi
- name: find_ct_apps
working-directory: source
id: run_find_apps
# emqx_plugin_libs doesn't have a test suite -> excluded from app list
# emqx ct is run independently -> exclude it from the app list
run: |
ct_apps="$(./scripts/find-apps.sh --json | jq -c 'del (.[] | select (. == "apps/emqx_plugin_libs" or . == "emqx"))')"
echo "ct-apps: $ct_apps"
echo "ct_apps=$ct_apps" >> $GITHUB_OUTPUT
- name: get_all_deps
working-directory: source
run: |
make deps-all
./rebar3 as test compile
cd ..
zip -ryq source.zip source/* source/.[^.]*
- uses: actions/upload-artifact@v3
with:
name: source
path: source.zip
steps:
- uses: actions/checkout@v2
- name: set git credentials
run: |
if make emqx-ee --dry-run > /dev/null 2>&1; then
echo "https://ci%40emqx.io:${{ secrets.CI_GIT_TOKEN }}@github.com" > $HOME/.git-credentials
git config --global credential.helper store
fi
- name: proper
run: make proper
eunit_and_proper:
needs: prepare
runs-on: ubuntu-20.04
container: emqx/build-env:erl23.3.4.9-3-ubuntu20.04
strategy:
fail-fast: false
matrix:
task:
- eunit
- proper
steps:
- uses: AutoModality/action-clean@v1
- uses: actions/download-artifact@v3
with:
name: source
path: .
- name: unzip source code
run: unzip -o -q source.zip
# produces eunit.coverdata and proper.coverdata
- name: eunit and proper
working-directory: source
run: make ${{ matrix.task }}
- uses: actions/upload-artifact@v3
with:
name: cover
path: source/_build/test/cover
if-no-files-found: warn
run_common_test:
runs-on: ${{ matrix.runs-on }}
strategy:
fail-fast: false
matrix:
runs-on:
- aws-amd64
- ubuntu-20.04
use-self-hosted:
- ${{ github.repository_owner == 'emqx' }}
exclude:
- runs-on: ubuntu-20.04
use-self-hosted: true
- runs-on: aws-amd64
use-self-hosted: false
steps:
- uses: actions/checkout@v2
# to avoid dirty self-hosted runners
- name: stop containers
run: |
docker rm -f $(docker ps -qa) || true
docker network rm $(docker network ls -q) || true
- name: docker compose up
if: endsWith(github.repository, 'emqx')
env:
MYSQL_TAG: 8
REDIS_TAG: 6
MONGO_TAG: 4
PGSQL_TAG: 13
LDAP_TAG: 2.4.50
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
docker-compose \
-f .ci/docker-compose-file/docker-compose.yaml \
-f .ci/docker-compose-file/docker-compose-toxiproxy.yaml \
-f .ci/docker-compose-file/docker-compose-ldap-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-mongo-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-mysql-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-pgsql-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-redis-single-tcp.yaml \
up -d --build
- name: docker compose up
if: endsWith(github.repository, 'emqx-enterprise')
env:
MYSQL_TAG: 8
REDIS_TAG: 6
MONGO_TAG: 4
PGSQL_TAG: 13
LDAP_TAG: 2.4.50
OPENTSDB_TAG: latest
INFLUXDB_TAG: 1.7.6
DYNAMODB_TAG: 1.11.477
TIMESCALE_TAG: latest-pg11
CASSANDRA_TAG: 3.11.6
RABBITMQ_TAG: 3.7
KAFKA_TAG: 2.5.0
PULSAR_TAG: 2.3.2
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
timeout-minutes: 20
run: |
docker-compose \
-f .ci/docker-compose-file/docker-compose.yaml \
-f .ci/docker-compose-file/docker-compose-toxiproxy.yaml \
-f .ci/docker-compose-file/docker-compose-ldap-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-mongo-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-mysql-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-pgsql-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-redis-single-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-enterprise.yaml \
-f .ci/docker-compose-file/docker-compose-enterprise-cassandra-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-enterprise-dynamodb-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-enterprise-influxdb-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-enterprise-kafka-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-enterprise-opentsdb-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-enterprise-pulsar-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-enterprise-rabbit-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-enterprise-timescale-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-enterprise-mysql-client.yaml \
-f .ci/docker-compose-file/docker-compose-enterprise-pgsql-and-timescale-client.yaml \
up -d --build
docker exec -i erlang bash -c "echo \"https://ci%40emqx.io:${{ secrets.CI_GIT_TOKEN }}@github.com\" > /root/.git-credentials && git config --global credential.helper store"
docker exec -i erlang bash -c "git config --global --add safe.directory /emqx"
while [ $(docker ps -a --filter name=client --filter exited=0 | wc -l) \
!= $(docker ps -a --filter name=client | wc -l) ]; do
sleep 5
done
- name: run eunit
run: |
docker exec -i erlang bash -c "make eunit"
- name: run common test
run: |
docker exec -i erlang bash -c "make ct"
- name: run cover
run: |
printenv > .env
docker exec -i erlang bash -c "git config --global --add safe.directory /emqx"
docker exec -i erlang bash -c "make cover"
docker exec --env-file .env -i erlang bash -c "make coveralls"
- name: cat rebar.crashdump
if: failure()
run: if [ -f 'rebar3.crashdump' ];then cat 'rebar3.crashdump'; fi
- uses: actions/upload-artifact@v1
if: failure()
with:
name: logs
path: _build/test/logs
- uses: actions/upload-artifact@v1
with:
name: cover
path: _build/test/cover
emqx_ct:
needs: prepare
runs-on: ubuntu-20.04
container: emqx/build-env:erl23.3.4.9-3-ubuntu20.04
strategy:
fail-fast: false
steps:
- uses: AutoModality/action-clean@v1
- uses: actions/download-artifact@v3
with:
name: source
path: .
- name: unzip source code
run: unzip -o -q source.zip
# produces emqx-emqx.coverdata
- name: emqx-ct-pipeline
working-directory: source
run: make emqx-ct-pipeline
- uses: actions/upload-artifact@v3
with:
name: cover
path: source/_build/test/cover
if-no-files-found: warn
finish:
needs: run_common_test
runs-on: ubuntu-20.04
steps:
- name: Coveralls Finished
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
curl -v -k https://coveralls.io/webhook \
--header "Content-Type: application/json" \
--data "{\"repo_name\":\"$GITHUB_REPOSITORY\",\"repo_token\":\"$GITHUB_TOKEN\",\"payload\":{\"build_num\":$GITHUB_RUN_ID,\"status\":\"done\"}}" || true
ct:
needs: prepare
runs-on: ${{ matrix.runs-on }}
strategy:
max-parallel: 12
fail-fast: false
matrix:
app_name: ${{ fromJson(needs.prepare.outputs.ct_apps) }}
runs-on:
- aws-amd64
- ubuntu-20.04
use-self-hosted:
- ${{ github.repository_owner == 'emqx' }}
exclude:
- runs-on: ubuntu-20.04
use-self-hosted: true
- runs-on: aws-amd64
use-self-hosted: false
steps:
- uses: AutoModality/action-clean@v1
- uses: actions/download-artifact@v3
with:
name: source
path: .
- name: unzip source code
run: unzip -q source.zip
# to avoid dirty self-hosted runners
- name: stop containers
run: |
docker rm -f $(docker ps -qa) || true
docker network rm $(docker network ls -q) || true
- name: docker compose up
working-directory: source
if: endsWith(github.repository, 'emqx')
env:
MYSQL_TAG: 8
REDIS_TAG: 6
MONGO_TAG: 4
PGSQL_TAG: 13
LDAP_TAG: 2.4.50
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
docker-compose \
-f .ci/docker-compose-file/docker-compose.yaml \
-f .ci/docker-compose-file/docker-compose-toxiproxy.yaml \
-f .ci/docker-compose-file/docker-compose-ldap-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-mongo-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-mysql-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-pgsql-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-redis-single-tcp.yaml \
up -d --build
docker exec -i erlang bash -c "git config --global --add safe.directory /emqx"
- name: docker compose up
working-directory: source
if: endsWith(github.repository, 'emqx-enterprise')
env:
MYSQL_TAG: 8
REDIS_TAG: 6
MONGO_TAG: 4
PGSQL_TAG: 13
LDAP_TAG: 2.4.50
OPENTSDB_TAG: latest
INFLUXDB_TAG: 1.7.6
DYNAMODB_TAG: 1.11.477
TIMESCALE_TAG: latest-pg11
CASSANDRA_TAG: 3.11.6
RABBITMQ_TAG: 3.7
KAFKA_TAG: 2.5.0
PULSAR_TAG: 2.3.2
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
timeout-minutes: 20
run: |
docker-compose \
-f .ci/docker-compose-file/docker-compose.yaml \
-f .ci/docker-compose-file/docker-compose-toxiproxy.yaml \
-f .ci/docker-compose-file/docker-compose-ldap-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-mongo-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-mysql-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-pgsql-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-redis-single-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-enterprise.yaml \
-f .ci/docker-compose-file/docker-compose-enterprise-cassandra-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-enterprise-dynamodb-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-enterprise-influxdb-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-enterprise-kafka-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-enterprise-opentsdb-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-enterprise-pulsar-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-enterprise-rabbit-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-enterprise-timescale-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-enterprise-mysql-client.yaml \
-f .ci/docker-compose-file/docker-compose-enterprise-pgsql-and-timescale-client.yaml \
up -d --build
docker exec -i erlang bash -c "echo \"https://ci%40emqx.io:${{ secrets.CI_GIT_TOKEN }}@github.com\" > /root/.git-credentials && git config --global credential.helper store"
docker exec -i erlang bash -c "git config --global --add safe.directory /emqx"
while [ $(docker ps -a --filter name=client --filter exited=0 | wc -l) \
!= $(docker ps -a --filter name=client | wc -l) ]; do
sleep 5
done
- name: run common test
run: docker exec -i erlang bash -c "make ${{ matrix.app_name }}-ct-pipeline"
- name: cat rebar.crashdump
if: failure()
working-directory: source
run: if [ -f 'rebar3.crashdump' ];then cat 'rebar3.crashdump'; fi
- name: set log file name
if: failure()
run: echo "LOGFILENAME=logs-$(echo ${{ matrix.app_name }} | tr '/' '_')" >> $GITHUB_ENV
- uses: actions/upload-artifact@v3
if: failure()
with:
name: ${{ env.LOGFILENAME }}
path: source/_build/test/logs
if-no-files-found: warn
- uses: actions/upload-artifact@v3
with:
name: cover
path: source/_build/test/cover
if-no-files-found: warn
make_cover:
needs:
- eunit_and_proper
- emqx_ct
- ct
runs-on: ubuntu-20.04
container: emqx/build-env:erl23.3.4.9-3-ubuntu20.04
steps:
- uses: AutoModality/action-clean@v1
- uses: actions/download-artifact@v3
with:
name: source
path: .
- name: unzip source code
run: unzip -q source.zip
- uses: actions/download-artifact@v3
name: download cover data
with:
name: cover
path: source/_build/test/cover
- name: make cover
working-directory: source
run: make cover
- name: send to coveralls
working-directory: source
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: make coveralls
- name: get coveralls logs
working-directory: source
if: failure()
run: cat rebar3.crashdump
finish:
needs: make_cover
runs-on: ubuntu-20.04
steps:
- name: Coveralls Finished
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
curl -v -k https://coveralls.io/webhook \
--header "Content-Type: application/json" \
--data "{\"repo_name\":\"$GITHUB_REPOSITORY\",\"repo_token\":\"$GITHUB_TOKEN\",\"payload\":{\"build_num\":$GITHUB_RUN_ID,\"status\":\"done\"}}" || true

View File

@ -61,6 +61,14 @@ $1-ct: $(REBAR)
endef
$(foreach app,$(APPS),$(eval $(call gen-app-ct-target,$(app))))
## app/name-ct-pipeline targets are used in pipeline -> make cover data for each app
.PHONY: $(APPS:%=%-ct-pipeline)
define gen-app-ct-target-pipeline
$1-ct-pipeline: $(REBAR)
$(REBAR) ct --name 'test@127.0.0.1' -c -v --cover_export_name $(PROFILE)-$(subst /,-,$1) --suite $(shell $(CURDIR)/scripts/find-suites.sh $1)
endef
$(foreach app,$(APPS),$(eval $(call gen-app-ct-target-pipeline,$(app))))
## apps/name-prop targets
.PHONY: $(APPS:%=%-prop)
define gen-app-prop-target

View File

@ -53,9 +53,8 @@ check(ClientInfo, AuthResult, #{auth := AuthParms = #{path := Path},
{stop, AuthResult#{auth_result => http_to_connack_error(Code),
anonymous => false}};
{error, Error} ->
?LOG(warning, "Deny connection from path: ~s, username: ~ts, due to "
"request http-server failed: ~0p",
[Path, Username, Error]),
?LOG_SENSITIVE(warning, "Deny connection from path: ~s, username: ~ts, due to "
"request http-server failed: ~0p", [Path, Username, Error]),
%%FIXME later: server_unavailable is not right.
{stop, AuthResult#{auth_result => server_unavailable,
anonymous => false}}
@ -89,10 +88,13 @@ is_superuser(SuperParams =
timeout := Timeout}, ClientInfo) ->
Retry = maps:get(retry_times, SuperParams, ?DEFAULT_RETRY_TIMES),
case request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout, Retry) of
{ok, 200, _Body} -> true;
{ok, _Code, _Body} -> false;
{error, Error} -> ?LOG(warning, "Request superuser path ~s, error: ~p", [Path, Error]),
false
{ok, 200, _Body} ->
true;
{ok, _Code, _Body} ->
false;
{error, Error} ->
?LOG_SENSITIVE(warning, "Request superuser path ~s, error: ~p", [Path, Error]),
false
end.
mountpoint(Body, #{mountpoint := Mountpoint}) ->

View File

@ -1,13 +1,17 @@
%% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{"4.3.7",[{load_module,emqx_auth_jwt,brutal_purge,soft_purge,[]}]},
[{"4.3.7",
[{load_module,emqx_auth_jwt_svr,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_jwt,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[3-6]">>,
[{load_module,emqx_auth_jwt,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_jwt_svr,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[0-2]">>,[{restart_application,emqx_auth_jwt}]},
{<<".*">>,[]}],
[{"4.3.7",[{load_module,emqx_auth_jwt,brutal_purge,soft_purge,[]}]},
[{"4.3.7",
[{load_module,emqx_auth_jwt_svr,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_jwt,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[3-6]">>,
[{load_module,emqx_auth_jwt,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_jwt_svr,brutal_purge,soft_purge,[]}]},

View File

@ -27,7 +27,7 @@
%% APIs
-export([start_link/1]).
-export([verify/1]).
-export([verify/1, trace/2]).
%% gen_server callbacks
-export([ init/1
@ -143,7 +143,8 @@ request_jwks(Addr) ->
?tp(debug, emqx_auth_jwt_svr_jwks_updated, #{jwks => Jwks, pid => self()}),
Jwks
catch _:_ ->
?LOG(error, "Invalid jwks server response: ~p~n", [Body]),
?MODULE:trace(jwks_server_reesponse, Body),
?LOG(error, "Invalid jwks server response, body is not logged for security reasons, trace it if inspection is required", []),
error(badarg)
end
end.
@ -174,7 +175,7 @@ do_verify(JwsCompacted) ->
end
catch
Class : Reason : Stk ->
?LOG(error, "verify JWK crashed: ~p, ~p, stacktrace: ~p~n",
?LOG_SENSITIVE(error, "verify JWK crashed: ~p, ~p, stacktrace: ~p~n",
[Class, Reason, Stk]),
{error, invalid_signature}
end.
@ -249,13 +250,15 @@ key2jwt_value(Key, Func, Options) ->
V ->
try Func(V) of
{error, Reason} ->
?LOG(warning, "Build ~p JWK ~p failed: {error, ~p}~n",
?LOG_SENSITIVE(warning, "Build ~p JWK ~p failed: {error, ~p}~n",
[Key, V, Reason]),
undefined;
J -> J
catch T:R ->
?LOG(warning, "Build ~p JWK ~p failed: {~p, ~p}~n",
?LOG_SENSITIVE(warning, "Build ~p JWK ~p failed: {~p, ~p}~n",
[Key, V, T, R]),
undefined
end
end.
trace(_Tag, _Data) -> ok.

View File

@ -1,6 +1,6 @@
{application, emqx_auth_ldap,
[{description, "EMQ X Authentication/ACL with LDAP"},
{vsn, "4.3.5"}, % strict semver, bump manually!
{vsn, "4.3.6"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_auth_ldap_sup]},
{applications, [kernel,stdlib,eldap2,ecpool]},

View File

@ -1,11 +1,16 @@
%% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{<<"4\\.3\\.[3-4]">>,
[{load_module,emqx_auth_ldap_app,brutal_purge,soft_purge,[]},
[{"4.3.5",
[{load_module,emqx_auth_ldap,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_ldap_cli,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[3-4]">>,
[{load_module,emqx_auth_ldap_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_ldap_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_ldap,brutal_purge,soft_purge,[]}]},
{"4.3.2",
[{load_module,emqx_auth_ldap_app,brutal_purge,soft_purge,[]},
[{load_module,emqx_auth_ldap_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_ldap_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_ldap,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_ldap,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[0-1]">>,
@ -14,11 +19,16 @@
{load_module,emqx_acl_ldap,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_ldap_cli,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{<<"4\\.3\\.[3-4]">>,
[{load_module,emqx_auth_ldap_app,brutal_purge,soft_purge,[]},
[{"4.3.5",
[{load_module,emqx_auth_ldap,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_ldap_cli,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[3-4]">>,
[{load_module,emqx_auth_ldap_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_ldap_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_ldap,brutal_purge,soft_purge,[]}]},
{"4.3.2",
[{load_module,emqx_auth_ldap_app,brutal_purge,soft_purge,[]},
[{load_module,emqx_auth_ldap_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_ldap_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_ldap,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_ldap,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[0-1]">>,

View File

@ -62,7 +62,7 @@ check(ClientInfo = #{username := Username, password := Password}, AuthResult,
{error, not_found} ->
ok;
{error, ResultCode} ->
?LOG(error, "[LDAP] Auth from ldap failed: ~p", [ResultCode]),
?LOG_SENSITIVE(error, "[LDAP] Auth from ldap failed: ~p", [ResultCode]),
{stop, AuthResult#{auth_result => ResultCode, anonymous => false}}
end.

View File

@ -54,22 +54,22 @@ connect(Opts) ->
false ->
[{port, Port}, {timeout, Timeout}]
end,
?LOG(debug, "[LDAP] Connecting to OpenLDAP server: ~p, Opts:~p ...", [Servers, LdapOpts]),
?LOG_SENSITIVE(debug, "[LDAP] Connecting to OpenLDAP server: ~p, Opts:~p ...", [Servers, LdapOpts]),
case eldap2:open(Servers, LdapOpts) of
{ok, LDAP} ->
try eldap2:simple_bind(LDAP, BindDn, BindPassword) of
ok -> {ok, LDAP};
{error, Error} ->
?LOG(error, "[LDAP] Can't authenticated to OpenLDAP server: ~p", [Error]),
?LOG_SENSITIVE(error, "[LDAP] Can't authenticated to OpenLDAP server: ~p", [Error]),
{error, Error}
catch
error:Reason ->
?LOG(error, "[LDAP] Can't authenticated to OpenLDAP server: ~p", [Reason]),
?LOG_SENSITIVE(error, "[LDAP] Can't authenticated to OpenLDAP server: ~p", [Reason]),
{error, Reason}
end;
{error, Reason} ->
?LOG(error, "[LDAP] Can't connect to OpenLDAP server: ~p", [Reason]),
?LOG_SENSITIVE(error, "[LDAP] Can't connect to OpenLDAP server: ~p", [Reason]),
{error, Reason}
end.
@ -147,4 +147,3 @@ init_args(ENVS) ->
match_objectclass => ObjectClass,
username_attr => UidAttr,
password_attr => PasswdAttr}}.

View File

@ -122,8 +122,8 @@ cli(_) ->
, {"acl list ", "List all acls"}
, {"acl show clientid <Clientid>", "Lookup clientid acl detail"}
, {"acl show username <Username>", "Lookup username acl detail"}
, {"acl aad clientid <Clientid> <Topic> <Action> <Access>", "Add clientid acl"}
, {"acl add Username <Username> <Topic> <Action> <Access>", "Add username acl"}
, {"acl add clientid <Clientid> <Topic> <Action> <Access>", "Add clientid acl"}
, {"acl add username <Username> <Topic> <Action> <Access>", "Add username acl"}
, {"acl add _all <Topic> <Action> <Access>", "Add $all acl"}
, {"acl delete clientid <Clientid> <Topic>", "Delete clientid acl"}
, {"acl delete username <Username> <Topic>", "Delete username acl"}

View File

@ -1,6 +1,6 @@
{application, emqx_auth_mnesia,
[{description, "EMQ X Authentication with Mnesia"},
{vsn, "4.3.9"}, % strict semver, bump manually
{vsn, "4.3.10"}, % strict semver, bump manually
{modules, []},
{registered, []},
{applications, [kernel,stdlib,mnesia]},

View File

@ -1,7 +1,9 @@
%% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{"4.3.7",
[{"4.3.9",[{load_module,emqx_acl_mnesia_cli,brutal_purge,soft_purge,[]}]},
{"4.3.8",[{load_module,emqx_acl_mnesia_cli,brutal_purge,soft_purge,[]}]},
{"4.3.7",
[{load_module,emqx_auth_mnesia_api,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_mnesia_cli,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[5-6]">>,
@ -33,7 +35,9 @@
{load_module,emqx_acl_mnesia_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_mnesia_app,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{"4.3.7",
[{"4.3.9",[{load_module,emqx_acl_mnesia_cli,brutal_purge,soft_purge,[]}]},
{"4.3.8",[{load_module,emqx_acl_mnesia_cli,brutal_purge,soft_purge,[]}]},
{"4.3.7",
[{load_module,emqx_auth_mnesia_api,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_mnesia_cli,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[5-6]">>,

View File

@ -55,7 +55,7 @@ check(ClientInfo = #{password := Password}, AuthResult,
undefined -> ok;
{error, Reason} ->
?tp(emqx_auth_mongo_check_authn_error, #{error => Reason}),
?LOG(error, "[MongoDB] Can't connect to MongoDB server: ~0p", [Reason]),
?LOG_SENSITIVE(error, "[MongoDB] Can't connect to MongoDB server: ~0p", [Reason]),
{stop, AuthResult#{auth_result => not_authorized, anonymous => false}};
UserMap ->
Result = case [maps:get(Field, UserMap, undefined) || Field <- Fields] of
@ -72,7 +72,7 @@ check(ClientInfo = #{password := Password}, AuthResult,
anonymous => false,
auth_result => success}};
{error, Error} ->
?LOG(error, "[MongoDB] check auth fail: ~p", [Error]),
?LOG_SENSITIVE(error, "[MongoDB] check auth fail: ~p", [Error]),
{stop, AuthResult#{auth_result => Error, anonymous => false}}
end
end.
@ -99,7 +99,7 @@ is_superuser(Pool, #superquery{collection = Coll, field = Field, selector = Sele
false;
{error, Reason} ->
?tp(emqx_auth_mongo_superuser_query_error, #{error => Reason}),
?LOG(error, "[MongoDB] Can't connect to MongoDB server: ~0p", [Reason]),
?LOG_SENSITIVE(error, "[MongoDB] Can't connect to MongoDB server: ~0p", [Reason]),
false;
Row ->
case maps:get(Field, Row, false) of

View File

@ -1,6 +1,6 @@
{application, emqx_auth_mysql,
[{description, "EMQ X Authentication/ACL with MySQL"},
{vsn, "4.3.3"}, % strict semver, bump manually!
{vsn, "4.3.4"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_auth_mysql_sup]},
{applications, [kernel,stdlib,mysql,ecpool]},

View File

@ -1,18 +1,28 @@
%% -*- mode: erlang -*-
{VSN,
[{<<"4\\.3\\.[1-2]">>,
[{load_module,emqx_auth_mysql_app,brutal_purge,soft_purge,[]},
[{"4.3.3",
[{load_module,emqx_auth_mysql,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_mysql_cli,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[1-2]">>,
[{load_module,emqx_auth_mysql_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_mysql_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_mysql,brutal_purge,soft_purge,[]}]},
{"4.3.0",
[{load_module,emqx_auth_mysql_app,brutal_purge,soft_purge,[]},
[{load_module,emqx_auth_mysql_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_mysql_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_mysql,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_mysql,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{<<"4\\.3\\.[1-2]">>,
[{load_module,emqx_auth_mysql_app,brutal_purge,soft_purge,[]},
[{"4.3.3",
[{load_module,emqx_auth_mysql,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_mysql_cli,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[1-2]">>,
[{load_module,emqx_auth_mysql_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_mysql_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_mysql,brutal_purge,soft_purge,[]}]},
{"4.3.0",
[{load_module,emqx_auth_mysql_app,brutal_purge,soft_purge,[]},
[{load_module,emqx_auth_mysql_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_mysql_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_mysql,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_mysql,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}]

View File

@ -41,7 +41,7 @@ check(ClientInfo = #{password := Password}, AuthResult,
{ok, _Columns, []} ->
{error, not_found};
{error, Reason} ->
?LOG(error, "[MySQL] query '~p' failed: ~p", [AuthSql, Reason]),
?LOG_SENSITIVE(error, "[MySQL] query '~p' failed: ~p", [AuthSql, Reason]),
{error, Reason}
end,
case CheckPass of
@ -52,7 +52,7 @@ check(ClientInfo = #{password := Password}, AuthResult,
{error, not_found} ->
ok;
{error, ResultCode} ->
?LOG(error, "[MySQL] Auth from mysql failed: ~p", [ResultCode]),
?LOG_SENSITIVE(error, "[MySQL] Auth from mysql failed: ~p", [ResultCode]),
{stop, AuthResult#{auth_result => ResultCode, anonymous => false}}
end.

View File

@ -54,10 +54,10 @@ connect(Options) ->
?LOG(error, "[MySQL] Can't connect to MySQL server: Connection refused."),
{error, Reason};
{error, Reason = {ErrorCode, _, Error}} ->
?LOG(error, "[MySQL] Can't connect to MySQL server: ~p - ~p", [ErrorCode, Error]),
?LOG_SENSITIVE(error, "[MySQL] Can't connect to MySQL server: ~p - ~p", [ErrorCode, Error]),
{error, Reason};
{error, Reason} ->
?LOG(error, "[MySQL] Can't connect to MySQL server: ~p", [Reason]),
?LOG_SENSITIVE(error, "[MySQL] Can't connect to MySQL server: ~p", [Reason]),
{error, Reason}
end.

View File

@ -1,6 +1,6 @@
{application, emqx_auth_pgsql,
[{description, "EMQ X Authentication/ACL with PostgreSQL"},
{vsn, "4.3.3"}, % strict semver, bump manually!
{vsn, "4.3.4"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_auth_pgsql_sup]},
{applications, [kernel,stdlib,epgsql,ecpool]},

View File

@ -1,11 +1,17 @@
%% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{<<"4\\.3\\.[0-2]">>,
[{"4.3.3",
[{load_module,emqx_auth_pgsql,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_pgsql_cli,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[0-2]">>,
%% restart it due to epgsql upgraded from 4.4.0 to 4.6.0
%% in emqx_auth_pgsql:v4.3.3
[{restart_application,emqx_auth_pgsql}]},
{<<".*">>,[]}],
[{<<"4\\.3\\.[0-2]">>,
[{"4.3.3",
[{load_module,emqx_auth_pgsql,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_pgsql_cli,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[0-2]">>,
[{restart_application,emqx_auth_pgsql}]},
{<<".*">>,[]}]}.

View File

@ -40,7 +40,7 @@ check(ClientInfo = #{password := Password}, AuthResult,
{ok, _, []} ->
{error, not_found};
{error, Reason} ->
?LOG(error, "[Postgres] query '~p' failed: ~p", [AuthSql, Reason]),
?LOG_SENSITIVE(error, "[Postgres] query '~p' failed: ~p", [AuthSql, Reason]),
{error, not_found}
end,
case CheckPass of
@ -51,7 +51,7 @@ check(ClientInfo = #{password := Password}, AuthResult,
{error, not_found} ->
ok;
{error, ResultCode} ->
?LOG(error, "[Postgres] Auth from pgsql failed: ~p", [ResultCode]),
?LOG_SENSITIVE(error, "[Postgres] Auth from pgsql failed: ~p", [ResultCode]),
{stop, AuthResult#{auth_result => ResultCode, anonymous => false}}
end.

View File

@ -82,7 +82,7 @@ connect(Opts) ->
?LOG(error, "[Postgres] Can't connect to Postgres server: Invalid password."),
{error, Reason};
{error, Reason} ->
?LOG(error, "[Postgres] Can't connect to Postgres server: ~p", [Reason]),
?LOG_SENSITIVE(error, "[Postgres] Can't connect to Postgres server: ~p", [Reason]),
{error, Reason}
end.

View File

@ -1,6 +1,6 @@
{application, emqx_auth_redis,
[{description, "EMQ X Authentication/ACL with Redis"},
{vsn, "4.3.3"}, % strict semver, bump manually!
{vsn, "4.3.4"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_auth_redis_sup]},
{applications, [kernel,stdlib,eredis,eredis_cluster,ecpool]},

View File

@ -1,19 +1,29 @@
%% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{<<"4\\.3\\.[1-2]">>,
[{load_module,emqx_auth_redis_app,brutal_purge,soft_purge,[]},
[{"4.3.3",
[{load_module,emqx_auth_redis,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_redis_cli,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[1-2]">>,
[{load_module,emqx_auth_redis_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_redis_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_redis,brutal_purge,soft_purge,[]}]},
{"4.3.0",
[{load_module,emqx_auth_redis_app,brutal_purge,soft_purge,[]},
[{load_module,emqx_auth_redis_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_redis_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_redis,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_redis,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{<<"4\\.3\\.[1-2]">>,
[{load_module,emqx_auth_redis_app,brutal_purge,soft_purge,[]},
[{"4.3.3",
[{load_module,emqx_auth_redis,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_redis_cli,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[1-2]">>,
[{load_module,emqx_auth_redis_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_redis_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_redis,brutal_purge,soft_purge,[]}]},
{"4.3.0",
[{load_module,emqx_auth_redis_app,brutal_purge,soft_purge,[]},
[{load_module,emqx_auth_redis_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_redis_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_redis,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_redis,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}]
}.
{<<".*">>,[]}]}.

View File

@ -42,7 +42,7 @@ check(ClientInfo = #{password := Password}, AuthResult,
{ok, [PassHash, Salt|_]} ->
check_pass({PassHash, Salt, Password}, HashType);
{error, Reason} ->
?LOG(error, "[Redis] Command: ~p failed: ~p", [AuthCmd, Reason]),
?LOG_SENSITIVE(error, "[Redis] Command: ~p failed: ~p", [AuthCmd, Reason]),
{error, not_found}
end,
case CheckPass of
@ -54,7 +54,7 @@ check(ClientInfo = #{password := Password}, AuthResult,
{error, not_found} ->
ok;
{error, ResultCode} ->
?LOG(error, "[Redis] Auth from redis failed: ~p", [ResultCode]),
?LOG_SENSITIVE(error, "[Redis] Auth from redis failed: ~p", [ResultCode]),
{stop, AuthResult#{auth_result => ResultCode, anonymous => false}}
end.

View File

@ -56,7 +56,7 @@ connect(Opts) ->
?LOG(error, "[Redis] Can't connect to Redis server: Authentication failed."),
{error, Reason};
{error, Reason} ->
?LOG(error, "[Redis] Can't connect to Redis server: ~p", [Reason]),
?LOG_SENSITIVE(error, "[Redis] Can't connect to Redis server: ~p", [Reason]),
{error, Reason}
end.
@ -86,4 +86,3 @@ repl(S, _Var, undefined) ->
repl(S, Var, Val) ->
NVal = re:replace(Val, "&", "\\\\&", [global, {return, list}]),
re:replace(S, Var, NVal, [{return, list}]).

View File

@ -40,6 +40,7 @@
-define(RESOURCE_TYPE_MQTT, 'bridge_mqtt').
-define(RESOURCE_TYPE_RPC, 'bridge_rpc').
-define(BAD_TOPIC_WITH_WILDCARD, wildcard_topic_not_allowed_for_publish).
-define(RESOURCE_CONFIG_SPEC_MQTT, #{
address => #{
@ -494,7 +495,7 @@ on_action_create_data_to_mqtt_broker(ActId, Opts = #{<<"pool">> := PoolName,
PayloadTks = emqx_rule_utils:preproc_tmpl(PayloadTmpl),
TopicTks = case ForwardTopic == <<"">> of
true -> undefined;
false -> emqx_rule_utils:preproc_tmpl(ForwardTopic)
false -> emqx_rule_utils:preproc_tmpl(assert_topic_valid(ForwardTopic))
end,
Opts.
@ -515,7 +516,7 @@ on_action_data_to_mqtt_broker(Msg, _Env =
qos = QoS,
from = From,
flags = Flags,
topic = Topic1,
topic = assert_topic_valid(Topic1),
payload = format_data(PayloadTks, Msg),
timestamp = TimeStamp},
ecpool:with_client(PoolName,
@ -583,7 +584,7 @@ options(Options, PoolName, ResId) ->
Get = fun(Key) -> GetD(Key, undefined) end,
Address = Get(<<"address">>),
[{max_inflight_batches, 32},
{forward_mountpoint, str(Get(<<"mountpoint">>))},
{forward_mountpoint, str(assert_topic_valid(Get(<<"mountpoint">>)))},
{disk_cache, cuttlefish_flag:parse(str(GetD(<<"disk_cache">>, "off")))},
{start_type, auto},
{reconnect_delay_ms, cuttlefish_duration:parse(str(Get(<<"reconnect_interval">>)), ms)},
@ -610,6 +611,12 @@ options(Options, PoolName, ResId) ->
| maybe_ssl(Options, Get(<<"ssl">>), ResId)]
end.
assert_topic_valid(T) ->
case emqx_topic:wildcard(T) of
true -> throw({?BAD_TOPIC_WITH_WILDCARD, T});
false -> T
end.
maybe_ssl(_Options, false, _ResId) ->
[];
maybe_ssl(Options, true, ResId) ->

View File

@ -1,6 +1,6 @@
{application, emqx_coap,
[{description, "EMQ X CoAP Gateway"},
{vsn, "4.3.1"}, % strict semver, bump manually!
{vsn, "4.3.2"}, % strict semver, bump manually!
{modules, []},
{registered, []},
{applications, [kernel,stdlib,gen_coap]},

View File

@ -1,9 +1,15 @@
%% -*-: erlang -*-
{VSN,
[{"4.3.0",[
{load_module, emqx_coap_mqtt_adapter, brutal_purge, soft_purge, []}]},
[{<<"4\\.3\\.[0-1]">>,[
{load_module, emqx_coap_mqtt_adapter, brutal_purge, soft_purge, []},
{load_module, emqx_coap_pubsub_resource, brutal_purge, soft_purge, []},
{load_module, emqx_coap_resource, brutal_purge, soft_purge, []}
]},
{<<".*">>, []}],
[{"4.3.0",[
{load_module, emqx_coap_mqtt_adapter, brutal_purge, soft_purge, []}]},
[{<<"4\\.3\\.[0-1]">>,[
{load_module, emqx_coap_mqtt_adapter, brutal_purge, soft_purge, []},
{load_module, emqx_coap_pubsub_resource, brutal_purge, soft_purge, []},
{load_module, emqx_coap_resource, brutal_purge, soft_purge, []}
]},
{<<".*">>, []}]
}.

View File

@ -31,6 +31,9 @@
-export([ subscribe/2
, unsubscribe/2
, publish/3
, received_puback/2
, message_payload/1
, message_topic/1
]).
-export([ client_pid/4
@ -95,6 +98,15 @@ unsubscribe(Pid, Topic) ->
publish(Pid, Topic, Payload) ->
gen_server:call(Pid, {publish, Topic, Payload}).
received_puback(Pid, Msg) ->
gen_server:cast(Pid, {received_puback, Msg}).
message_payload(#message{payload = Payload}) ->
Payload.
message_topic(#message{topic = Topic}) ->
Topic.
%% For emqx_management plugin
call(Pid, Msg) ->
call(Pid, Msg, infinity).
@ -172,13 +184,19 @@ handle_call(Request, _From, State) ->
?LOG(error, "adapter unexpected call ~p", [Request]),
{reply, ignored, State, hibernate}.
handle_cast({received_puback, Msg}, State) ->
%% NOTE: the counter named 'messages.acked', but the hook named 'message.acked'!
ok = emqx_metrics:inc('messages.acked'),
_ = emqx_hooks:run('message.acked', [conninfo(State), Msg]),
{noreply, State, hibernate};
handle_cast(Msg, State) ->
?LOG(error, "broker_api unexpected cast ~p", [Msg]),
{noreply, State, hibernate}.
handle_info({deliver, _Topic, #message{topic = Topic, payload = Payload}},
handle_info({deliver, _Topic, #message{} = Msg},
State = #state{sub_topics = Subscribers}) ->
deliver([{Topic, Payload}], Subscribers),
deliver([Msg], Subscribers),
{noreply, State, hibernate};
handle_info(check_alive, State = #state{sub_topics = []}) ->
@ -271,27 +289,25 @@ packet_to_message(Topic, Payload,
%% Deliver
deliver([], _) -> ok;
deliver([Pub | More], Subscribers) ->
ok = do_deliver(Pub, Subscribers),
deliver([Msg | More], Subscribers) ->
ok = do_deliver(Msg, Subscribers),
deliver(More, Subscribers).
do_deliver({Topic, Payload}, Subscribers) ->
do_deliver(Msg, Subscribers) ->
%% handle PUBLISH packet from broker
?LOG(debug, "deliver message from broker Topic=~p, Payload=~p", [Topic, Payload]),
deliver_to_coap(Topic, Payload, Subscribers),
?LOG(debug, "deliver message from broker, msg: ~p", [Msg]),
deliver_to_coap(Msg, Subscribers),
ok.
deliver_to_coap(_TopicName, _Payload, []) ->
deliver_to_coap(_Msg, []) ->
ok;
deliver_to_coap(TopicName, Payload, [{TopicFilter, {IsWild, CoapPid}} | T]) ->
deliver_to_coap(#message{topic = TopicName} = Msg, [{TopicFilter, {IsWild, CoapPid}} | T]) ->
Matched = case IsWild of
true -> emqx_topic:match(TopicName, TopicFilter);
false -> TopicName =:= TopicFilter
end,
%?LOG(debug, "deliver_to_coap Matched=~p, CoapPid=~p, TopicName=~p, Payload=~p, T=~p",
% [Matched, CoapPid, TopicName, Payload, T]),
Matched andalso (CoapPid ! {dispatch, TopicName, Payload}),
deliver_to_coap(TopicName, Payload, T).
Matched andalso (CoapPid ! {dispatch, Msg}),
deliver_to_coap(Msg, T).
%%--------------------------------------------------------------------
%% Helper funcs
@ -328,12 +344,13 @@ chann_info(State) ->
will_msg => undefined
}.
conninfo(#state{peername = Peername,
conninfo(#state{peername = {PeerHost, _} = Peername,
clientid = ClientId,
connected_at = ConnectedAt}) ->
#{socktype => udp,
sockname => {{127, 0, 0, 1}, 5683},
peername => Peername,
peerhost => PeerHost,
peercert => nossl, %% TODO: dtls
conn_mod => ?MODULE,
proto_name => <<"CoAP">>,

View File

@ -138,16 +138,32 @@ coap_unobserve({state, ChId, Prefix, TopicPath}) ->
ok.
handle_info({dispatch, Topic, Payload}, State) ->
%% This clause should never be matched any more. We keep it here to handle
%% the old format messages during the release upgrade.
%% In this case the second function clause of `coap_ack/2` will be called,
%% and the ACKs is discarded.
?LOG(debug, "dispatch Topic=~p, Payload=~p", [Topic, Payload]),
{ok, Ret} = emqx_coap_pubsub_topics:reset_topic_info(Topic, Payload),
?LOG(debug, "Updated publish info of topic=~p, the Ret is ~p", [Topic, Ret]),
{notify, [], #coap_content{format = <<"application/octet-stream">>, payload = Payload}, State};
handle_info({dispatch, Msg}, State) ->
Payload = emqx_coap_mqtt_adapter:message_payload(Msg),
Topic = emqx_coap_mqtt_adapter:message_topic(Msg),
{ok, Ret} = emqx_coap_pubsub_topics:reset_topic_info(Topic, Payload),
?LOG(debug, "Updated publish info of topic=~p, the Ret is ~p", [Topic, Ret]),
{notify, {pub, Msg}, #coap_content{format = <<"application/octet-stream">>, payload = Payload}, State};
handle_info(Message, State) ->
?LOG(error, "Unknown Message ~p", [Message]),
{noreply, State}.
coap_ack(_Ref, State) -> {ok, State}.
coap_ack({pub, Msg}, State) ->
?LOG(debug, "received coap ack for publish msg: ~p", [Msg]),
Pid = get(mqtt_client_pid),
emqx_coap_mqtt_adapter:received_puback(Pid, Msg),
{ok, State};
coap_ack(_Ref, State) ->
?LOG(debug, "received coap ack: ~p", [_Ref]),
{ok, State}.
%%--------------------------------------------------------------------
%% Internal Functions

View File

@ -104,12 +104,26 @@ coap_unobserve({state, ChId, Prefix, Topic}) ->
ok.
handle_info({dispatch, Topic, Payload}, State) ->
%% This clause should never be matched any more. We keep it here to handle
%% the old format messages during the release upgrade.
%% In this case the second function clause of `coap_ack/2` will be called,
%% and the ACKs is discarded.
?LOG(debug, "dispatch Topic=~p, Payload=~p", [Topic, Payload]),
{notify, [], #coap_content{format = <<"application/octet-stream">>, payload = Payload}, State};
handle_info({dispatch, Msg}, State) ->
Payload = emqx_coap_mqtt_adapter:message_payload(Msg),
{notify, {pub, Msg}, #coap_content{format = <<"application/octet-stream">>, payload = Payload}, State};
handle_info(Message, State) ->
emqx_coap_mqtt_adapter:handle_info(Message, State).
coap_ack(_Ref, State) -> {ok, State}.
coap_ack({pub, Msg}, State) ->
?LOG(debug, "received coap ack for publish msg: ~p", [Msg]),
Pid = get(mqtt_client_pid),
emqx_coap_mqtt_adapter:received_puback(Pid, Msg),
{ok, State};
coap_ack(_Ref, State) ->
?LOG(debug, "received coap ack: ~p", [_Ref]),
{ok, State}.
get_auth(Query) ->
get_auth(Query, #coap_mqtt_auth{}).

View File

@ -91,7 +91,7 @@ t_observe(_Config) ->
Topic = <<"abc">>, TopicStr = binary_to_list(Topic),
Payload = <<"123">>,
Uri = "coap://127.0.0.1/mqtt/"++TopicStr++"?c=client1&u=tom&p=secret",
{ok, Pid, N, Code, Content} = er_coap_observer:observe(Uri),
{ok, Pid, N, Code, Content} = er_coap_observer:observe(Uri),
?LOGT("observer Pid=~p, N=~p, Code=~p, Content=~p", [Pid, N, Code, Content]),
[SubPid] = emqx:subscribers(Topic),
@ -195,12 +195,16 @@ t_one_clientid_sub_2_topics(_Config) ->
[SubPid] = emqx:subscribers(Topic2),
?assert(is_pid(SubPid)),
CntrAcked1 = emqx_metrics:val('messages.acked'),
emqx:publish(emqx_message:make(Topic1, Payload1)),
Notif1 = receive_notification(),
?LOGT("observer 1 get Notif=~p", [Notif1]),
{coap_notify, _, _, {ok,content}, #coap_content{payload = PayloadRecv1}} = Notif1,
?assertEqual(Payload1, PayloadRecv1),
timer:sleep(100),
CntrAcked2 = emqx_metrics:val('messages.acked'),
?assertEqual(CntrAcked2, CntrAcked1 + 1),
emqx:publish(emqx_message:make(Topic2, Payload2)),
@ -208,6 +212,9 @@ t_one_clientid_sub_2_topics(_Config) ->
?LOGT("observer 2 get Notif=~p", [Notif2]),
{coap_notify, _, _, {ok,content}, #coap_content{payload = PayloadRecv2}} = Notif2,
?assertEqual(Payload2, PayloadRecv2),
timer:sleep(100),
CntrAcked3 = emqx_metrics:val('messages.acked'),
?assertEqual(CntrAcked3, CntrAcked2 + 1),
er_coap_observer:stop(Pid1),
er_coap_observer:stop(Pid2).

View File

@ -20,6 +20,16 @@ management.default_application.id = admin
## Value: String
management.default_application.secret = public
## Initialize apps file
## Is used to add administrative app/secrets when EMQX is launched for the first time.
## This config will not take any effect once EMQX database has one or more apps.
## The file content format is as below:
## ```
##819e5db182cf:l9C5suZClIF3FvdzWqmINrVU61WNfIjcglxw9CVM7y1VI
##bb5a6cf1c06a:WuNRRgcRTGiNcuyrE49Bpwz4PGPrRnP4hUMi647kNSbN
## ```
# management.bootstrap_apps_file = {{ platform_etc_dir }}/bootstrap_apps.txt
##--------------------------------------------------------------------
## HTTP Listener

View File

@ -6,6 +6,11 @@
{datatype, integer}
]}.
{mapping, "management.bootstrap_apps_file", "emqx_management.bootstrap_apps_file", [
{datatype, string},
hidden
]}.
{mapping, "management.default_application.id", "emqx_management.default_application_id", [
{default, undefined},
{datatype, string}

View File

@ -25,11 +25,16 @@
]).
start(_Type, _Args) ->
{ok, Sup} = emqx_mgmt_sup:start_link(),
_ = emqx_mgmt_auth:add_default_app(),
emqx_mgmt_http:start_listeners(),
emqx_mgmt_cli:load(),
{ok, Sup}.
case emqx_mgmt_auth:init_bootstrap_apps() of
ok ->
{ok, Sup} = emqx_mgmt_sup:start_link(),
_ = emqx_mgmt_auth:add_default_app(),
emqx_mgmt_http:start_listeners(),
emqx_mgmt_cli:load(),
{ok, Sup};
{error, _Reason} = Error ->
Error
end.
stop(_State) ->
emqx_mgmt_http:stop_listeners().

View File

@ -35,6 +35,7 @@
, update_app/5
, del_app/1
, list_apps/0
, init_bootstrap_apps/0
]).
%% APP Auth/ACL API
@ -44,6 +45,8 @@
-record(mqtt_app, {id, secret, name, desc, status, expired}).
-define(BOOTSTRAP_TAG, <<"Bootstrapped From File">>).
-type(appid() :: binary()).
-type(appsecret() :: binary()).
@ -77,6 +80,68 @@ add_default_app() ->
add_app(AppId1, <<"Default">>, AppSecret1, <<"Application user">>, true, undefined)
end.
init_bootstrap_apps() ->
Bootstrap = application:get_env(emqx_management, bootstrap_apps_file, undefined),
Size = mnesia:table_info(mqtt_app, size),
init_bootstrap_apps(Bootstrap, Size).
init_bootstrap_apps(undefined, _) -> ok;
init_bootstrap_apps(_File, Size)when Size > 0 -> ok;
init_bootstrap_apps(File, 0) ->
case file:open(File, [read, binary]) of
{ok, Dev} ->
{ok, MP} = re:compile(<<"(\.+):(\.+$)">>, [ungreedy]),
case init_bootstrap_apps(File, Dev, MP) of
ok -> ok;
Error ->
%% if failed add bootstrap users, we should clear all bootstrap apps
{atomic, ok} = mnesia:clear_table(mqtt_app),
Error
end;
{error, Reason} = Error ->
?LOG(error,
"failed to open the mgmt bootstrap apps file(~s) for ~p",
[File, Reason]
),
Error
end.
init_bootstrap_apps(File, Dev, MP) ->
try
add_bootstrap_app(File, Dev, MP, 1)
catch
throw:Error -> {error, Error};
Type:Reason:Stacktrace ->
{error, {Type, Reason, Stacktrace}}
after
file:close(Dev)
end.
add_bootstrap_app(File, Dev, MP, Line) ->
case file:read_line(Dev) of
{ok, Bin} ->
case re:run(Bin, MP, [global, {capture, all_but_first, binary}]) of
{match, [[AppId, AppSecret]]} ->
Name = <<"bootstraped">>,
case add_app(AppId, Name, AppSecret, ?BOOTSTRAP_TAG, true, undefined) of
{ok, _} ->
add_bootstrap_app(File, Dev, MP, Line + 1);
{error, Reason} ->
throw(#{file => File, line => Line, content => Bin, reason => Reason})
end;
_ ->
?LOG(error,
"failed to bootstrap apps file(~s) for Line(~w): ~ts",
[File, Line, Bin]
),
throw(#{file => File, line => Line, content => Bin, reason => "invalid format"})
end;
eof ->
ok;
{error, Error} ->
throw(#{file => File, line => Line, reason => Error})
end.
-spec(add_app(appid(), binary()) -> {ok, appsecret()} | {error, term()}).
add_app(AppId, Name) when is_binary(AppId) ->
add_app(AppId, Name, <<"Application user">>, true, undefined).

View File

@ -32,8 +32,11 @@ init_per_suite(Cfg) ->
ok = emqx_dashboard_admin:mnesia(boot),
application:load(emqx_modules),
application:load(emqx_bridge_mqtt),
ekka_mnesia:start(),
emqx_dashboard_admin:mnesia(boot),
emqx_ct_helpers:start_apps([emqx_rule_engine, emqx_management]),
application:ensure_all_started(emqx_dashboard),
ok = emqx_rule_engine:load_providers(),
Cfg.
end_per_suite(Cfg) ->
@ -185,4 +188,4 @@ remove_resources() ->
lists:foreach(fun(#resource{id = Id}) ->
emqx_rule_engine:delete_resource(Id)
end, emqx_rule_registry:get_resources()),
timer:sleep(500).
timer:sleep(500).

View File

@ -0,0 +1,91 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_bootstrap_app_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("emqx/include/emqx.hrl").
%%--------------------------------------------------------------------
%% Setups
%%--------------------------------------------------------------------
all() ->
emqx_ct:all(?MODULE).
init_per_suite(Config) ->
emqx_ct_helpers:boot_modules(all),
application:load(emqx_modules),
application:load(emqx_modules_spec),
application:load(emqx_management),
application:stop(emqx_rule_engine),
ekka_mnesia:start(),
emqx_ct_helpers:start_apps([]),
Config.
end_per_suite(_) ->
ok = application:unset_env(emqx_management, bootstrap_apps_file),
_ = mnesia:clear_table(mqtt_app),
emqx_ct_helpers:stop_apps([]),
ok.
%%--------------------------------------------------------------------
%% Test cases
%%--------------------------------------------------------------------
t_load_ok(_) ->
application:stop(emqx_management),
Bin = <<"test-1:secret-1\ntest-2:secret-2">>,
File = "./bootstrap_apps.txt",
ok = file:write_file(File, Bin),
_ = mnesia:clear_table(mqtt_app),
application:set_env(emqx_management, bootstrap_apps_file, File),
{ok, _} = application:ensure_all_started(emqx_management),
?assert(emqx_mgmt_auth:is_authorized(<<"test-1">>, <<"secret-1">>)),
?assert(emqx_mgmt_auth:is_authorized(<<"test-2">>, <<"secret-2">>)),
?assertNot(emqx_mgmt_auth:is_authorized(<<"test-2">>, <<"secret-1">>)),
application:stop(emqx_management).
t_bootstrap_user_file_not_found(_) ->
File = "./bootstrap_apps_not_exist.txt",
check_load_failed(File),
ok.
t_load_invalid_username_failed(_) ->
Bin = <<"test-1:password-1\ntest&2:password-2">>,
File = "./bootstrap_apps.txt",
ok = file:write_file(File, Bin),
check_load_failed(File),
ok.
t_load_invalid_format_failed(_) ->
Bin = <<"test-1:password-1\ntest-2password-2">>,
File = "./bootstrap_apps.txt",
ok = file:write_file(File, Bin),
check_load_failed(File),
ok.
check_load_failed(File) ->
_ = mnesia:clear_table(mqtt_app),
application:stop(emqx_management),
application:set_env(emqx_management, bootstrap_apps_file, File),
?assertMatch({error, _}, application:ensure_all_started(emqx_management)),
?assertNot(lists:member(emqx_management, application:which_applications())),
?assertEqual(0, mnesia:table_info(mqtt_app, size)).

View File

@ -22,6 +22,8 @@
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
-define(BAD_TOPIC_WITH_WILDCARD, wildcard_topic_not_allowed_for_publish).
-define(REPUBLISH_PARAMS_SPEC, #{
target_topic => #{
order => 1,
@ -163,7 +165,7 @@ on_action_create_republish(Id, Params = #{
}) ->
TargetRetain = to_retain(maps:get(<<"target_retain">>, Params, <<"false">>)),
TargetQoS = to_qos(TargetQoS0),
TopicTks = emqx_rule_utils:preproc_tmpl(TargetTopic),
TopicTks = emqx_rule_utils:preproc_tmpl(assert_topic_valid(TargetTopic)),
PayloadTks = emqx_rule_utils:preproc_tmpl(PayloadTmpl),
Params.
@ -201,7 +203,7 @@ on_action_republish(Selected, _Envs = #{
from = ActId,
flags = Flags#{retain => get_retain(TargetRetain, Selected)},
headers = #{republish_by => ActId},
topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected),
topic = assert_topic_valid(emqx_rule_utils:proc_tmpl(TopicTks, Selected)),
payload = format_msg(PayloadTks, Selected),
timestamp = Timestamp
},
@ -226,7 +228,7 @@ on_action_republish(Selected, _Envs = #{
from = ActId,
flags = #{dup => false, retain => get_retain(TargetRetain, Selected)},
headers = #{republish_by => ActId},
topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected),
topic = assert_topic_valid(emqx_rule_utils:proc_tmpl(TopicTks, Selected)),
payload = format_msg(PayloadTks, Selected),
timestamp = erlang:system_time(millisecond)
},
@ -270,6 +272,12 @@ get_qos(-1, _Data, Default) -> Default;
get_qos(TargetQoS, Data, _Default) ->
qos(emqx_rule_utils:replace_var(TargetQoS, Data)).
assert_topic_valid(T) ->
case emqx_topic:wildcard(T) of
true -> throw({?BAD_TOPIC_WITH_WILDCARD, T});
false -> T
end.
qos(<<"0">>) -> 0;
qos(<<"1">>) -> 1;
qos(<<"2">>) -> 2;

View File

@ -277,7 +277,7 @@ create_resource(#{type := Type, config := Config0} = Params, Retry) ->
_ = try ?CLUSTER_CALL(init_resource_with_retrier, InitArgs, ok,
init_resource, InitArgs)
catch throw : Reason ->
?LOG(error, "create_resource failed: ~0p", [Reason])
?LOG_SENSITIVE(warning, "create_resource failed: ~0p", [Reason])
end,
{ok, Resource};
no_retry ->
@ -285,6 +285,7 @@ create_resource(#{type := Type, config := Config0} = Params, Retry) ->
_ = ?CLUSTER_CALL(init_resource, InitArgs),
{ok, Resource}
catch throw : Reason ->
?LOG_SENSITIVE(error, "create_resource failed: ~0p", [Reason]),
{error, Reason}
end
end;

View File

@ -136,16 +136,22 @@ t_preproc_sql5(_) ->
emqx_rule_utils:proc_cql_param_str(ParamsTokens, Selected)).
t_if_contains_placeholder(_) ->
?assert(emqx_rule_utils:if_contains_placeholder(<<"${a}">>)),
?assert(emqx_rule_utils:if_contains_placeholder(<<"${a}${b}">>)),
?assert(emqx_rule_utils:if_contains_placeholder(<<"${a},${b},${c}">>)),
?assert(emqx_rule_utils:if_contains_placeholder(<<"a:${a}">>)),
?assert(emqx_rule_utils:if_contains_placeholder(<<"a:${a},b:${b}">>)),
?assert(emqx_rule_utils:if_contains_placeholder(<<"abc${ab}">>)),
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"a">>)),
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"abc$">>)),
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"abc${">>)),
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"abc${a">>)),
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"abc${ab">>)),
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"a${ab${c${e">>)),
ok.
TestTab =
[ {true, "${a}"}
, {true, "${a}${b}"}
, {true, "${a},${b},${c}"}
, {true, "a:${a}"}
, {true, "a:${a},b:${b}"}
, {true, "abc${ab}"}
, {true, "a${ab${c}${e"}
, {false, "a"}
, {false, "abc$"}
, {false, "abc${"}
, {false, "abc${a"}
, {false, "abc${ab"}
, {false, "a${ab${c${e"}
],
lists:foreach(fun({Expected, InputStr}) ->
?assert(Expected =:= emqx_rule_utils:if_contains_placeholder(InputStr)),
?assert(Expected =:= emqx_rule_utils:if_contains_placeholder(iolist_to_binary(InputStr)))
end, TestTab).

View File

@ -56,7 +56,10 @@ groups() ->
{rulesql_select_events, [],
[ t_sqlparse_event_client_connected_01
, t_sqlparse_event_client_connected_02
, t_sqlparse_event_client_disconnected
, t_sqlparse_event_client_disconnected_normal
, t_sqlparse_event_client_disconnected_kicked
, t_sqlparse_event_client_disconnected_discarded
, t_sqlparse_event_client_disconnected_takeovered
, t_sqlparse_event_session_subscribed
, t_sqlparse_event_session_unsubscribed
, t_sqlparse_event_message_delivered
@ -145,6 +148,12 @@ end_per_group(_Groupname, _Config) ->
%% Testcase specific setup/teardown
%%------------------------------------------------------------------------------
init_per_testcase(t_sqlparse_event_client_disconnected_discarded, Config) ->
application:set_env(emqx, client_disconnect_discarded, true),
Config;
init_per_testcase(t_sqlparse_event_client_disconnected_takeovered, Config) ->
application:set_env(emqx, client_disconnect_takeovered, true),
Config;
init_per_testcase(_TestCase, Config) ->
init_events_counters(),
ok = emqx_rule_registry:register_resource_types(
@ -152,6 +161,12 @@ init_per_testcase(_TestCase, Config) ->
%ct:pal("============ ~p", [ets:tab2list(emqx_resource_type)]),
Config.
end_per_testcase(t_sqlparse_event_client_disconnected_takeovered, Config) ->
application:set_env(emqx, client_disconnect_takeovered, false), %% back to default
Config;
end_per_testcase(t_sqlparse_event_client_disconnected_discarded, Config) ->
application:set_env(emqx, client_disconnect_discarded, false), %% back to default
Config;
end_per_testcase(_TestCase, _Config) ->
ok.
@ -523,9 +538,118 @@ t_sqlparse_event_client_connected_02(_Config) ->
emqx_rule_registry:remove_rule(TopicRule).
%% FROM $events/client_disconnected
t_sqlparse_event_client_disconnected(_Config) ->
%% TODO
ok.
t_sqlparse_event_client_disconnected_normal(_Config) ->
ok = emqx_rule_engine:load_providers(),
Sql = "select * "
"from \"$events/client_disconnected\" ",
RepubT = <<"repub/to/disconnected/normal">>,
TopicRule = create_simple_repub_rule(RepubT, Sql, <<>>),
{ok, Client} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]),
{ok, _} = emqtt:connect(Client),
{ok, _, _} = emqtt:subscribe(Client, RepubT, 0),
ct:sleep(200),
{ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]),
{ok, _} = emqtt:connect(Client1),
emqtt:disconnect(Client1),
receive {publish, #{topic := T, payload := Payload}} ->
?assertEqual(RepubT, T),
?assertMatch(#{<<"reason">> := <<"normal">>}, emqx_json:decode(Payload, [return_maps]))
after 1000 ->
ct:fail(wait_for_repub_disconnected_normal)
end,
emqtt:stop(Client),
emqx_rule_registry:remove_rule(TopicRule).
t_sqlparse_event_client_disconnected_kicked(_Config) ->
ok = emqx_rule_engine:load_providers(),
Sql = "select * "
"from \"$events/client_disconnected\" ",
RepubT = <<"repub/to/disconnected/kicked">>,
TopicRule = create_simple_repub_rule(RepubT, Sql, <<>>),
{ok, ClientRecvRepub} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]),
{ok, _} = emqtt:connect(ClientRecvRepub),
{ok, _, _} = emqtt:subscribe(ClientRecvRepub, RepubT, 0),
ct:sleep(200),
{ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]),
{ok, _} = emqtt:connect(Client1),
emqx_cm:kick_session(<<"emqx">>),
unlink(Client1), %% the process will receive {'EXIT',{shutdown,tcp_closed}}
receive {publish, #{topic := T, payload := Payload}} ->
?assertEqual(RepubT, T),
?assertMatch(#{<<"reason">> := <<"kicked">>}, emqx_json:decode(Payload, [return_maps]))
after 1000 ->
ct:fail(wait_for_repub_disconnected_kicked)
end,
emqtt:stop(ClientRecvRepub),
emqx_rule_registry:remove_rule(TopicRule).
t_sqlparse_event_client_disconnected_discarded(_Config) ->
ok = emqx_rule_engine:load_providers(),
Sql = "select * "
"from \"$events/client_disconnected\" ",
RepubT = <<"repub/to/disconnected/discarded">>,
TopicRule = create_simple_repub_rule(RepubT, Sql, <<>>),
{ok, ClientRecvRepub} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]),
{ok, _} = emqtt:connect(ClientRecvRepub),
{ok, _, _} = emqtt:subscribe(ClientRecvRepub, RepubT, 0),
ct:sleep(200),
{ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]),
{ok, _} = emqtt:connect(Client1),
unlink(Client1), %% the process will receive {'EXIT',{shutdown,tcp_closed}}
{ok, Client2} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}, {clean_start, true}]),
{ok, _} = emqtt:connect(Client2),
receive {publish, #{topic := T, payload := Payload}} ->
?assertEqual(RepubT, T),
?assertMatch(#{<<"reason">> := <<"discarded">>}, emqx_json:decode(Payload, [return_maps]))
after 1000 ->
ct:fail(wait_for_repub_disconnected_discarded)
end,
emqtt:stop(ClientRecvRepub), emqtt:stop(Client2),
emqx_rule_registry:remove_rule(TopicRule).
t_sqlparse_event_client_disconnected_takeovered(_Config) ->
ok = emqx_rule_engine:load_providers(),
Sql = "select * "
"from \"$events/client_disconnected\" ",
RepubT = <<"repub/to/disconnected/takeovered">>,
TopicRule = create_simple_repub_rule(RepubT, Sql, <<>>),
{ok, ClientRecvRepub} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]),
{ok, _} = emqtt:connect(ClientRecvRepub),
{ok, _, _} = emqtt:subscribe(ClientRecvRepub, RepubT, 0),
ct:sleep(200),
{ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]),
{ok, _} = emqtt:connect(Client1),
unlink(Client1), %% the process will receive {'EXIT',{shutdown,tcp_closed}}
{ok, Client2} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}, {clean_start, false}]),
{ok, _} = emqtt:connect(Client2),
receive {publish, #{topic := T, payload := Payload}} ->
?assertEqual(RepubT, T),
?assertMatch(#{<<"reason">> := <<"takeovered">>}, emqx_json:decode(Payload, [return_maps]))
after 1000 ->
ct:fail(wait_for_repub_disconnected_discarded)
end,
emqtt:stop(ClientRecvRepub), emqtt:stop(Client2),
emqx_rule_registry:remove_rule(TopicRule).
%% FROM $events/session_subscribed
t_sqlparse_event_session_subscribed(_Config) ->

View File

@ -2,6 +2,14 @@
## Enhancements
- Remove useless information from the dashboard listener failure log [#9260](https://github.com/emqx/emqx/pull/9260).
- We now trigger the `'message.acked'` hook after the CoAP gateway sends a message to the device and receives the ACK from the device [#9264](https://github.com/emqx/emqx/pull/9264).
With this change, the CoAP gateway can be combined with the offline message caching function (in the
emqx enterprise), so that CoAP devices are able to read the missed messages from the database when
it is online again.
- Support to use placeholders like `${var}` in the HTTP `Headers` of rule-engine's Webhook actions [#9239](https://github.com/emqx/emqx/pull/9239).
- Asynchronously refresh the resources and rules during emqx boot-up [#9199](https://github.com/emqx/emqx/pull/9199).
@ -15,9 +23,29 @@
- Added a log censor to avoid logging sensitive data [#9189](https://github.com/emqx/emqx/pull/9189).
If the data to be logged is a map or key-value list which contains sensitive key words such as `password`, the value is obfuscated as `******`.
- Enhanced log security in ACL modules, sensitive data will be obscured [#9242](https://github.com/emqx/emqx/pull/9242).
- Add `management.bootstrap_apps_file` configuration to bulk import default app/secret when EMQX initializes the database [#9273](https://github.com/emqx/emqx/pull/9273).
- Added two new configs for deterministic order of authentication and ACL checks [#9283](https://github.com/emqx/emqx/pull/9283).
The two new global config names are `auth_order` and `acl_order`.
When multiple ACL or auth plugins (or modules) are enabled, without this config, the order (in which each backend is queried)
is determined by the start/restart order of the plugin (or module).
Meaning, if a plugin (or module) is restarted after initial boot, it may get ordered to the end of the list.
With this config, you may set the order with a comma-speapated ACL or auth plugin names (or aliases).
For example: `acl_order = jwt,http`, this will make sure `jwt` is always checked before `http`,
meaning if JWT is not found (or no `acl` cliam) for a client, then the ACL check will fallback to use the HTTP backend.
- Added configurations to enable more `client.disconnected` events (and counter bumps) [#9267](https://github.com/emqx/emqx/pull/9267).
Prior to this change, the `client.disconnected` event (and counter bump) is triggered when a client
performs a 'normal' disconnect, or is 'kicked' by system admin, but NOT triggered when a
stale connection had to be 'discarded' (for clean session) or 'takenover' (for non-clean session).
Now it is possible to set configs `broker.client_disconnect_discarded` and `broker.client_disconnect_takenover` to `on` to enable the event in these scenarios.
## Bug fixes
- Fix that after uploading a backup file with an UTF8 filename, HTTP API `GET /data/export` fails with status code 500 [#9224](https://github.com/emqx/emqx/pull/9224).
- Fix that after uploading a backup file with an non-ASCII filename, HTTP API `GET /data/export` fails with status code 500 [#9224](https://github.com/emqx/emqx/pull/9224).
- Improve the display of rule's 'Maximum Speed' counter to only reserve 2 decimal places [#9185](https://github.com/emqx/emqx/pull/9185).
This is to avoid displaying floats like `0.30000000000000004` on the dashboard.
@ -42,4 +70,8 @@
Note that the `id` in `POST /api/v4/rules` should be literals (not encoded) when creating a `rule` or `resource`.
See docs [Create Rule](https://www.emqx.io/docs/zh/v4.3/advanced/http-api.html#post-api-v4-rules) [Create Resource](https://www.emqx.io/docs/zh/v4.3/advanced/http-api.html#post-api-v4-resources).
- Calling 'DELETE /alarms/deactivated' now deletes deactived alarms on all nodes, including remote nodes, not just the local node. [#9280](https://github.com/emqx/emqx/pull/9280)
- Calling 'DELETE /alarms/deactivated' now deletes deactived alarms on all nodes, including remote nodes, not just the local node [#9280](https://github.com/emqx/emqx/pull/9280).
- When republishing messages or bridge messages to other brokers, check the validity of the topic and make sure it does not have topic wildcards [#9291](https://github.com/emqx/emqx/pull/9291).

View File

@ -2,6 +2,11 @@
## 增强
- 删除 Dashboard 监听器失败时日志中的无用信息 [#9260](https://github.com/emqx/emqx/pull/9260).
- 当 CoAP 网关给设备投递消息并收到设备发来的确认之后,回调 `'message.acked'` 钩子 [#9264](https://github.com/emqx/emqx/pull/9264)。
有了这个改动CoAP 网关可以配合 EMQX (企业版)的离线消息缓存功能,让 CoAP 设备重新上线之后,从数据库读取其离线状态下错过的消息。
- 支持在规则引擎的 Webhook 动作的 HTTP Headers 里使用 `${var}` 格式的占位符 [#9239](https://github.com/emqx/emqx/pull/9239)。
- 在 emqx 启动时,异步地刷新资源和规则 [#9199](https://github.com/emqx/emqx/pull/9199)。
@ -15,9 +20,26 @@
- 增强包含敏感数据的日志的安全性 [#9189](https://github.com/emqx/emqx/pull/9189)。
如果日志中包含敏感关键词,例如 `password`,那么关联的数据回被模糊化处理,替换成 `******`
- 增强 ACL 模块中的日志安全性,敏感数据将被模糊化 [#9242](https://github.com/emqx/emqx/pull/9242)。
- 增加 `management.bootstrap_apps_file` 配置,可以让 EMQX 初始化数据库时,从该文件批量导入一些 APP / Secret [#9273](https://github.com/emqx/emqx/pull/9273)。
- 增加了固化认证和 ACL 模块调用顺序的配置 [#9283](https://github.com/emqx/emqx/pull/9283)。
这两个新的全局配置名称为 `auth_order``acl_order`
当有多个认证或 ACL 插件(或模块)开启时,没有该配置的话,模块调用的顺序取决于它们的启动顺序。
例如,如果一个插件(或模块)在系统启动之后单独重启了,那么它就有可能排到其他插件(或模块)的后面去。
有了这个配置之后,用户可以使用用逗号分隔的插件(或模块)的名字(或别名)来固化他们被调用的顺序。
例如,`acl_order = jwt,http`,可以用于保证 `jwt` 这个模块总是排在 `http` 的前面,
也就是说,在对客户端进行 ACL 检查时,如果 JWT 不存在(或者没有定义 ACL那么回退到使用 HTTP。
- 为更多类型的 `client.disconnected` 事件(计数器触发)提供可配置项 [#9267](https://github.com/emqx/emqx/pull/9267)。
此前,`client.disconnected` 事件及计数器仅会在客户端正常断开连接或客户端被系统管理员踢出时触发,
但不会在旧 session 被废弃 (clean_session = true) 或旧 session 被接管 (clean_session = false) 时被触发。
可将 `broker.client_disconnect_discarded``broker.client_disconnect_takovered` 选项设置为 `on` 来启用此场景下的客户端断连事件。
## 修复
- 修复若上传的备份文件名中包含 UTF8 字符,`GET /data/export` HTTP 接口返回 500 错误 [#9224](https://github.com/emqx/emqx/pull/9224)。
- 修复若上传的备份文件名中包含非 ASCII 字符,`GET /data/export` HTTP 接口返回 500 错误 [#9224](https://github.com/emqx/emqx/pull/9224)。
- 改进规则的 "最大执行速度" 的计数,只保留小数点之后 2 位 [#9185](https://github.com/emqx/emqx/pull/9185)。
避免在 dashboard 上展示类似这样的浮点数:`0.30000000000000004`。
@ -42,4 +64,8 @@
注意在创建规则或资源时HTTP body 中的 `id` 字段仍为字面值,而不是编码之后的值。
详情请参考 [创建规则](https://www.emqx.io/docs/zh/v4.3/advanced/http-api.html#post-api-v4-rules) 和 [创建资源](https://www.emqx.io/docs/zh/v4.3/advanced/http-api.html#post-api-v4-resources)。
- 修复调用 'DELETE /alarms/deactivated' 只在单个节点上生效的问题,现在将会删除所有节点上的非活跃警告。
- 修复调用 'DELETE /alarms/deactivated' 只在单个节点上生效的问题,现在将会删除所有节点上的非活跃警告 [#9280](https://github.com/emqx/emqx/pull/9280)。
- 在进行消息重发布或桥接消息到其他 mqtt broker 时,检查 topic 合法性,确定其不带有主题通配符 [#9291](https://github.com/emqx/emqx/pull/9291)。

View File

@ -698,6 +698,45 @@ acl_deny_action = ignore
## Value: Integer,Duration,Duration
flapping_detect_policy = 30, 1m, 5m
## When using multiple authentication backends, this config can be used to define the order.
## Default value "none" means no explicit ordering.
## Use comma to separate the names or aliases, for example "jwt,http" means "jwt" authentication should be checked before "http".
## Supported names are:
## 'http': emqx_auth_http
## 'jwt': emqx_auth_jwt
## 'ldap': emqx_auth_ldap
## 'mnesia': emqx_auth_mnesia
## 'mongo' (or 'mongodb'): emqx_auth_mongo
## 'mysql': emqx_auth_mysql
## 'pgsql' (or 'postgres'): emqx_auth_pgsql
## 'redis': emqx_auth_redis
## The specified backends are ordered prior to the unspecified ones,
## for example when "mnesia", "jwt" and "http" are in use,
## if only "jwt,http" is configured here, then "mnesia" is ordered after the other two.
## The name can also be the specific callback module name, e.g. my_auth_plugin_module,
## but it is silently discarded if there is no such module found in the system.
auth_order = none
## When using multiple ACL backends, this config can be used to define the order.
## Default value "none" means no explicit ordering.
## Use comma to separate the names or aliases, for example "jwt,http" means "jwt" ACL should be checked before "http".
## Supported names are:
## 'http': emqx_acl_http
## 'internal' (or 'file'): emqx_mod_acl_internal
## 'jwt': emqx_auth_jwt
## 'ldap': emqx_acl_ldap
## 'mnesia': emqx_acl_mnesia
## 'mongo' (or 'mongodb'): emqx_acl_mongo
## 'mysql': emqx_acl_mysql
## 'pgsql' (or 'postgres'): emqx_acl_pgsql
## 'redis': emqx_acl_redis
## The specified backends are ordered prior to the unspecified ones,
## for example when "mnesia", "jwt" and "http" are in use,
## if only "jwt,http" is configured here, then "mnesia" is ordered after the other two.
## The name can also be the specific callback module name, e.g. my_auth_plugin_module,
## but it is silently discarded if there is no such module found in the system.'
acl_order = none
##--------------------------------------------------------------------
## MQTT Protocol
##--------------------------------------------------------------------
@ -2416,6 +2455,16 @@ broker.route_batch_clean = off
## - false: disable trie path compaction
# broker.perf.trie_compaction = true
## Enable client disconnect event will be triggered by which reasons.
## Value: on | off
## `takeover`: session was takenover by another client with same client ID. (clean_session = false)
## Default: off
## `discard`: session was takeover by another client with same client ID. (clean_session = true)
## Default: off
##
# broker.client_disconnect_discarded = off
# broker.client_disconnect_takeovered = off
## CONFIG_SECTION_BGN=sys_mon ==================================================
## Enable Long GC monitoring. Disable if the value is 0.

View File

@ -54,7 +54,7 @@ groups() ->
{overview, [sequence], [t_overview]},
{admins, [sequence], [t_admins_add_delete, t_admins_persist_default_password, t_default_password_persists_after_leaving_cluster]},
{rest, [sequence], [t_rest_api]},
{cli, [sequence], [t_cli]}
{cli, [sequence], [t_cli, t_start_listener_failed_log]}
].
init_per_suite(Config) ->
@ -236,6 +236,21 @@ t_cli(_Config) ->
AdminList = emqx_dashboard_admin:all_users(),
?assertEqual(2, length(AdminList)).
t_start_listener_failed_log({init, Config}) ->
_ = application:stop(emqx_dashboard),
Config;
t_start_listener_failed_log({'end', _Config}) ->
_ = application:start(emqx_dashboard),
ok;
t_start_listener_failed_log(_Config) ->
ct:capture_start(),
Options = [{num_acceptors,4}, {max_connections,512}, {inet6,false}, {ipv6_v6only,false}],
?assertError(_, emqx_dashboard:start_listener({http, {"1.1.1.1", 8080}, Options})),
ct:capture_stop(),
I0 = ct:capture_get(),
?assertMatch({match, _}, re:run(iolist_to_binary(I0), "eaddrnotavail", [])),
ok.
%%------------------------------------------------------------------------------
%% Internal functions
%%------------------------------------------------------------------------------

View File

@ -23,6 +23,18 @@
all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) ->
%% do not start the application
%% only testing the root supervisor in this suite
application:stop(emqx_modules),
{ok, Pid} = emqx_mod_sup:start_link(),
unlink(Pid),
Config.
end_per_suite(_Config) ->
exit(whereis(emqx_mod_sup), kill),
ok.
%%--------------------------------------------------------------------
%% Test cases
%%--------------------------------------------------------------------

View File

@ -804,6 +804,19 @@ end}.
%% Authentication/ACL
%%--------------------------------------------------------------------
%% @doc Define a determined authentication plugin/module check order.
%% see detailed doc in emqx.conf
{mapping, "auth_order", "emqx.auth_order", [
{default, "none"},
{datatype, string}
]}.
%% @doc Same as auth_order, but for ACL.
{mapping, "acl_order", "emqx.acl_order", [
{default, "none"},
{datatype, string}
]}.
%% @doc Allow anonymous authentication.
{mapping, "allow_anonymous", "emqx.allow_anonymous", [
{default, false},
@ -2494,6 +2507,20 @@ end}.
{datatype, {enum, [true, false]}}
]}.
%% @doc Configuration of disconnected event reason.
%% `takeover`: session was takenover by another client with same client ID. (clean_session = false)
%% `discard`: session was takeover by another client with same client ID. (clean_session = true)
{mapping, "broker.client_disconnect_discarded", "emqx.client_disconnect_discarded", [
{default, off},
{datatype, flag}
]}.
{mapping, "broker.client_disconnect_takeovered", "emqx.client_disconnect_takeovered", [
{default, off},
{datatype, flag}
]}.
%%--------------------------------------------------------------------
%% System Monitor
%%--------------------------------------------------------------------

View File

@ -50,7 +50,7 @@
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.8.1.11"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.2"}}}
, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.3.6"}}}
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.9"}}}
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.10"}}}
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.2"}}}
, {replayq, {git, "https://github.com/emqx/replayq", {tag, "0.3.4"}}}
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.4"}}}

View File

@ -5,6 +5,29 @@ set -euo pipefail
# ensure dir
cd -P -- "$(dirname -- "$0")/.."
help() {
echo
echo "-h|--help: To display this usage info"
echo "--json: Print apps in json"
}
WANT_JSON='no'
while [ "$#" -gt 0 ]; do
case $1 in
-h|--help)
help
exit 0
;;
--json)
WANT_JSON='yes'
shift 1
;;
*)
echo "unknown option $1"
exit 1
;;
esac
done
if [ "$(./scripts/get-distro.sh)" = 'windows' ]; then
# Otherwise windows may resolve to find.exe
FIND="/usr/bin/find"
@ -17,17 +40,26 @@ find_app() {
"$FIND" "${appdir}" -mindepth 1 -maxdepth 1 -type d
}
# append emqx application first
echo 'emqx'
EM="emqx"
CE="$(find_app 'apps')"
find_app 'apps'
if [ -f 'EMQX_ENTERPRISE' ]; then
find_app 'lib-ee'
LIB="$(find_app 'lib-ee')"
else
find_app 'lib-ce'
LIB="$(find_app 'lib-ce')"
fi
## find directories in lib-extra
find_app 'lib-extra'
LIBE="$(find_app 'lib-extra')"
## find symlinks in lib-extra
"$FIND" 'lib-extra' -mindepth 1 -maxdepth 1 -type l -exec test -e {} \; -print
LIBES="$("$FIND" 'lib-extra' -mindepth 1 -maxdepth 1 -type l -exec test -e {} \; -print)"
APPS_ALL="$(echo -e "${EM}\n${CE}\n${LIB}\n${LIBE}\n${LIBES}")"
if [ "$WANT_JSON" = 'yes' ]; then
echo "${APPS_ALL}" | xargs | tr -d '\n' | jq -R -s -c 'split(" ")'
else
echo "${APPS_ALL}"
fi

View File

@ -2,14 +2,16 @@
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{"4.3.22",
[{load_module,emqx_misc,brutal_purge,soft_purge,[]},
[{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
{"4.3.21",
[{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
[{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{add_module,emqx_secret},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
@ -26,7 +28,8 @@
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
{load_module,emqx,brutal_purge,soft_purge,[]}]},
{"4.3.20",
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
[{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{add_module,emqx_secret},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
@ -44,7 +47,8 @@
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
{load_module,emqx,brutal_purge,soft_purge,[]}]},
{"4.3.19",
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
[{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{add_module,emqx_secret},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
@ -63,7 +67,8 @@
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]}]},
{"4.3.18",
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
[{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{add_module,emqx_secret},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
@ -82,7 +87,8 @@
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]}]},
{"4.3.17",
[{add_module,emqx_secret},
[{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
{add_module,emqx_secret},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_router,brutal_purge,soft_purge,[]},
@ -104,7 +110,8 @@
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]}]},
{"4.3.16",
[{add_module,emqx_secret},
[{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
{add_module,emqx_secret},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_router,brutal_purge,soft_purge,[]},
@ -133,7 +140,8 @@
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]}]},
{"4.3.15",
[{add_module,emqx_secret},
[{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
{add_module,emqx_secret},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_router,brutal_purge,soft_purge,[]},
@ -870,14 +878,16 @@
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{"4.3.22",
[{load_module,emqx_misc,brutal_purge,soft_purge,[]},
[{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
{"4.3.21",
[{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
[{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
@ -893,7 +903,8 @@
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
{load_module,emqx,brutal_purge,soft_purge,[]}]},
{"4.3.20",
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
[{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_router,brutal_purge,soft_purge,[]},
@ -910,7 +921,8 @@
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
{load_module,emqx,brutal_purge,soft_purge,[]}]},
{"4.3.19",
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
[{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_router,brutal_purge,soft_purge,[]},
@ -928,7 +940,8 @@
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]}]},
{"4.3.18",
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
[{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_router,brutal_purge,soft_purge,[]},
@ -946,7 +959,8 @@
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]}]},
{"4.3.17",
[{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
[{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_router,brutal_purge,soft_purge,[]},
{load_module,emqx_tracer,brutal_purge,soft_purge,[]},
@ -967,7 +981,8 @@
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]}]},
{"4.3.16",
[{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
[{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_router,brutal_purge,soft_purge,[]},
{load_module,emqx_tracer,brutal_purge,soft_purge,[]},
@ -995,7 +1010,8 @@
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
{delete_module,emqx_exclusive_subscription}]},
{"4.3.15",
[{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
[{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_router,brutal_purge,soft_purge,[]},
{load_module,emqx_tracer,brutal_purge,soft_purge,[]},

View File

@ -972,7 +972,13 @@ handle_call(discard, Channel) ->
disconnect_and_shutdown(discarded, ok, Channel);
%% Session Takeover
handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) ->
handle_call({takeover, 'begin'}, Channel = #channel{
session = Session,
conninfo = #{clientid := ClientId}
}) ->
?tp(debug,
emqx_channel_takeover_begin,
#{clientid => ClientId}),
reply(Session, Channel#channel{takeover = true});
handle_call({takeover, 'end'}, Channel = #channel{session = Session,
@ -1698,7 +1704,16 @@ parse_topic_filters(TopicFilters) ->
lists:map(fun emqx_topic:parse/1, TopicFilters).
%%--------------------------------------------------------------------
%% Ensure disconnected
%% Maybe & Ensure disconnected
ensure_disconnected(connected, Reason, Channel)
when Reason =:= discarded orelse Reason =:= takeovered ->
case is_disconnect_event_enabled(Reason) of
true -> ensure_disconnected(Reason, Channel);
false -> Channel
end;
ensure_disconnected(_, _, Channel) ->
Channel.
ensure_disconnected(Reason, Channel = #channel{conninfo = ConnInfo,
clientinfo = ClientInfo}) ->
@ -1793,12 +1808,15 @@ shutdown(Reason, Reply, Channel) ->
shutdown(Reason, Reply, Packet, Channel) ->
{shutdown, Reason, Reply, Packet, Channel}.
%% mqtt v5 connected sessions
disconnect_and_shutdown(Reason, Reply, Channel = ?IS_MQTT_V5
= #channel{conn_state = connected}) ->
shutdown(Reason, Reply, ?DISCONNECT_PACKET(reason_code(Reason)), Channel);
disconnect_and_shutdown(Reason, Reply, Channel) ->
shutdown(Reason, Reply, Channel).
NChannel = ensure_disconnected(connected, Reason, Channel),
shutdown(Reason, Reply, ?DISCONNECT_PACKET(reason_code(Reason)), NChannel);
%% mqtt v3/v4 sessions, mqtt v5 other conn_state sessions
disconnect_and_shutdown(Reason, Reply, Channel= #channel{conn_state = ConnState}) ->
NChannel = ensure_disconnected(ConnState, Reason, Channel),
shutdown(Reason, Reply, NChannel).
sp(true) -> 1;
sp(false) -> 0.
@ -1806,6 +1824,11 @@ sp(false) -> 0.
flag(true) -> 1;
flag(false) -> 0.
is_disconnect_event_enabled(discarded) ->
emqx:get_env(client_disconnect_discarded, false);
is_disconnect_event_enabled(takeovered) ->
emqx:get_env(client_disconnect_takeovered, false).
%%--------------------------------------------------------------------
%% For CT tests
%%--------------------------------------------------------------------

View File

@ -21,6 +21,10 @@
-include("logger.hrl").
-include("types.hrl").
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
-logger_header("[Hooks]").
-export([ start_link/0
@ -38,6 +42,8 @@
, run/2
, run_fold/3
, lookup/1
, reorder_acl_callbacks/0
, reorder_auth_callbacks/0
]).
-export([ callback_action/1
@ -86,6 +92,7 @@
-define(TAB, ?MODULE).
-define(SERVER, ?MODULE).
-define(UNKNOWN_ORDER, 999999999).
-spec(start_link() -> startlink_ret()).
start_link() ->
@ -229,6 +236,16 @@ lookup(HookPoint) ->
[] -> []
end.
%% @doc Reorder ACL check callbacks
-spec reorder_acl_callbacks() -> ok.
reorder_acl_callbacks() ->
gen_server:cast(?SERVER, {reorder_callbacks, 'client.check_acl'}).
%% @doc Reorder Authentication check callbacks
-spec reorder_auth_callbacks() -> ok.
reorder_auth_callbacks() ->
gen_server:cast(?SERVER, {reorder_callbacks, 'client.authenticate'}).
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
@ -238,11 +255,12 @@ init([]) ->
{ok, #{}}.
handle_call({add, HookPoint, Callback = #callback{action = Action}}, _From, State) ->
Reply = case lists:keymember(Action, #callback.action, Callbacks = lookup(HookPoint)) of
Callbacks = lookup(HookPoint),
Reply = case lists:keymember(Action, #callback.action, Callbacks) of
true ->
{error, already_exists};
false ->
insert_hook(HookPoint, add_callback(Callback, Callbacks))
ok = add_and_insert(HookPoint, [Callback], Callbacks)
end,
{reply, Reply, State};
@ -250,12 +268,22 @@ handle_call(Req, _From, State) ->
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast({reorder_callbacks, HookPoint}, State) ->
Callbacks = lookup(HookPoint),
case Callbacks =:= [] of
true ->
%% no callbaks, make sure not to insert []
ok;
false ->
ok = add_and_insert(HookPoint, Callbacks, [])
end,
{noreply, State};
handle_cast({del, HookPoint, Action}, State) ->
case del_callback(Action, lookup(HookPoint)) of
[] ->
ets:delete(?TAB, HookPoint);
Callbacks ->
insert_hook(HookPoint, Callbacks)
ok = insert_hook(HookPoint, Callbacks)
end,
{noreply, State};
@ -277,19 +305,92 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal functions
%%------------------------------------------------------------------------------
add_and_insert(HookPoint, NewCallbacks, Callbacks) ->
HookOrder = get_hook_order(HookPoint),
NewCallbaks = add_callbacks(HookOrder, NewCallbacks, Callbacks),
ok = insert_hook(HookPoint, NewCallbaks).
get_hook_order('client.authenticate') ->
get_auth_acl_hook_order(auth_order);
get_hook_order('client.check_acl') ->
get_auth_acl_hook_order(acl_order);
get_hook_order(_) ->
[].
get_auth_acl_hook_order(AppEnvName) ->
case emqx:get_env(AppEnvName) of
[_|_] = CSV ->
%% non-empty string
parse_auth_acl_hook_order(AppEnvName, CSV);
_ ->
[]
end.
parse_auth_acl_hook_order(auth_order, CSV) ->
parse_auth_acl_hook_order(fun parse_auth_name/1, CSV);
parse_auth_acl_hook_order(acl_order, CSV) ->
parse_auth_acl_hook_order(fun parse_acl_name/1, CSV);
parse_auth_acl_hook_order(NameParser, CSV) when is_function(NameParser) ->
do_parse_auth_acl_hook_order(NameParser, string:tokens(CSV, ", ")).
do_parse_auth_acl_hook_order(_, []) -> [];
do_parse_auth_acl_hook_order(Parser, ["none" | Names]) ->
%% "none" is the default config value
do_parse_auth_acl_hook_order(Parser, Names);
do_parse_auth_acl_hook_order(Parser, [Name0 | Names]) ->
Name = Parser(Name0),
[Name | do_parse_auth_acl_hook_order(Parser, Names)].
%% NOTE: It's ugly to enumerate plugin names here.
%% But it's the most straightforward way.
parse_auth_name("http") -> "emqx_auth_http";
parse_auth_name("jwt") -> "emqx_auth_jwt";
parse_auth_name("ldap") -> "emqx_auth_ldap";
parse_auth_name("mnesia") -> "emqx_auth_mnesia";
parse_auth_name("mongodb") -> "emqx_auth_mongo";
parse_auth_name("mongo") -> "emqx_auth_mongo";
parse_auth_name("mysql") -> "emqx_auth_mysql";
parse_auth_name("pgsql") -> "emqx_auth_pgsql";
parse_auth_name("postgres") -> "emqx_auth_pgsql";
parse_auth_name("redis") -> "emqx_auth_redis";
parse_auth_name(Other) -> Other. %% maybe a user defined plugin or the module name directly
parse_acl_name("file") -> "emqx_mod_acl_internal";
parse_acl_name("internal") -> "emqx_mod_acl_internal";
parse_acl_name("http") -> "emqx_acl_http";
parse_acl_name("jwt") -> "emqx_auth_jwt"; %% this is not a typo, there is no emqx_acl_jwt module
parse_acl_name("ldap") -> "emqx_acl_ldap";
parse_acl_name("mnesia") -> "emqx_acl_mnesia";
parse_acl_name("mongo") -> "emqx_acl_mongo";
parse_acl_name("mongodb") -> "emqx_acl_mongo";
parse_acl_name("mysql") -> "emqx_acl_mysql";
parse_acl_name("pgsql") -> "emqx_acl_pgsql";
parse_acl_name("postgres") -> "emqx_acl_pgsql";
parse_acl_name("redis") -> "emqx_acl_redis";
parse_acl_name(Other) -> Other. %% maybe a user defined plugin or the module name directly
insert_hook(HookPoint, Callbacks) ->
ets:insert(?TAB, #hook{name = HookPoint, callbacks = Callbacks}), ok.
ets:insert(?TAB, #hook{name = HookPoint, callbacks = Callbacks}),
ok.
add_callback(C, Callbacks) ->
add_callback(C, Callbacks, []).
add_callbacks(_Order, [], Callbacks) ->
Callbacks;
add_callbacks(Order, [C | More], Callbacks) ->
NewCallbacks = add_callback(Order, C, Callbacks),
add_callbacks(Order, More, NewCallbacks).
add_callback(C, [], Acc) ->
add_callback(Order, C, Callbacks) ->
add_callback(Order, C, Callbacks, []).
add_callback(_Order, C, [], Acc) ->
lists:reverse([C|Acc]);
add_callback(C1 = #callback{priority = P1}, [C2 = #callback{priority = P2}|More], Acc)
when P1 =< P2 ->
add_callback(C1, More, [C2|Acc]);
add_callback(C1, More, Acc) ->
lists:append(lists:reverse(Acc), [C1 | More]).
add_callback(Order, C1, [C2|More], Acc) ->
case is_lower_priority(Order, C1, C2) of
true ->
add_callback(Order, C1, More, [C2|Acc]);
false ->
lists:append(lists:reverse(Acc), [C1, C2 | More])
end.
del_callback(Action, Callbacks) ->
del_callback(Action, Callbacks, []).
@ -304,3 +405,144 @@ del_callback(Func, [#callback{action = {Func, _A}} | Callbacks], Acc) ->
del_callback(Func, Callbacks, Acc);
del_callback(Action, [Callback | Callbacks], Acc) ->
del_callback(Action, Callbacks, [Callback | Acc]).
%% does A have lower priority than B?
is_lower_priority(Order,
#callback{priority = PrA, action = ActA},
#callback{priority = PrB, action = ActB}) ->
PosA = callback_position(Order, ActA),
PosB = callback_position(Order, ActB),
case PosA =:= PosB of
true ->
%% When priority is equal, the new callback (A) goes after the existing (B) hence '=<'
PrA =< PrB;
false ->
%% When OrdA > OrdB the new callback (A) positioned after the exiting (B)
PosA > PosB
end.
callback_position(Order, Callback) ->
M = callback_module(Callback),
find_list_item_position(Order, atom_to_list(M)).
callback_module({M, _F, _A}) -> M;
callback_module({F, _A}) when is_function(F) ->
{module, M} = erlang:fun_info(F, module),
M;
callback_module(F) when is_function(F) ->
{module, M} = erlang:fun_info(F, module),
M.
find_list_item_position(Order, Name) ->
find_list_item_position(Order, Name, 1).
find_list_item_position([], _ModuleName, _N) ->
%% Not found, make sure it's ordered behind the found ones
?UNKNOWN_ORDER;
find_list_item_position([Prefix | Rest], ModuleName, N) ->
case is_prefix(Prefix, ModuleName) of
true ->
N;
false ->
find_list_item_position(Rest, ModuleName, N + 1)
end.
is_prefix(Prefix, ModuleName) ->
case string:prefix(ModuleName, Prefix) of
nomatch ->
false;
_Sufix ->
true
end.
-ifdef(TEST).
add_priority_rules_test_() ->
[{ "high prio",
fun() ->
OrderString = "foo, bar",
Existing = [make_hook(0, emqx_acl_pgsql), make_hook(0, emqx_acl_mysql)],
New = make_hook(1, emqx_acl_mnesia),
Expected = [New | Existing],
?assertEqual(Expected, test_add_acl(OrderString, New, Existing))
end},
{ "low prio",
fun() ->
OrderString = "foo, bar",
Existing = [make_hook(0, emqx_auth_jwt), make_hook(0, emqx_acl_mongo)],
New = make_hook(-1, emqx_acl_mnesia),
Expected = Existing++ [New],
?assertEqual(Expected, test_add_acl(OrderString, New, Existing))
end},
{ "mid prio",
fun() ->
OrderString = "",
Existing = [make_hook(3, emqx_acl_http), make_hook(1, emqx_acl_redis)],
New = make_hook(2, emqx_acl_ldap),
Expected = [hd(Existing), New | tl(Existing)],
?assertEqual(Expected, test_add_acl(OrderString, New, Existing))
end}
].
add_order_rules_test_() ->
[{"initial add",
fun() ->
OrderString = "ldap,pgsql,file",
Existing = [],
New = make_hook(2, foo),
?assertEqual([New], test_add_auth(OrderString, New, Existing))
end},
{ "before",
fun() ->
OrderString = "mongodb,postgres,internal",
Existing = [make_hook(1, emqx_auth_pgsql), make_hook(3, emqx_auth_mysql)],
New = make_hook(2, emqx_auth_mongo),
Expected = [New | Existing],
?assertEqual(Expected, test_add_auth(OrderString, New, Existing))
end},
{ "after",
fun() ->
OrderString = "mysql,postgres,ldap",
Existing = [make_hook(1, emqx_auth_pgsql), make_hook(3, emqx_auth_mysql)],
New = make_hook(2, emqx_auth_ldap),
Expected = Existing ++ [New],
?assertEqual(Expected, test_add_auth(OrderString, New, Existing))
end},
{ "unknown goes after knowns",
fun() ->
OrderString = "mongo,mysql,,mnesia", %% ,, is intended to test empty string
Existing = [make_hook(1, emqx_auth_mnesia), make_hook(3, emqx_auth_mysql)],
New1 = make_hook(2, fun() -> foo end), %% fake hook
New2 = make_hook(3, {fun lists:append/1, []}), %% fake hook
Expected1 = Existing ++ [New1],
Expected2 = Existing ++ [New2, New1], %% 2 is before 1 due to higher prio
?assertEqual(Expected1, test_add_auth(OrderString, New1, Existing)),
?assertEqual(Expected2, test_add_auth(OrderString, New2, Expected1))
end},
{ "known goes first",
fun() ->
OrderString = "redis,jwt",
Existing = [make_hook(1, emqx_auth_mnesia), make_hook(3, emqx_auth_mysql)],
Redis = make_hook(2, emqx_auth_redis),
Jwt = make_hook(2, emqx_auth_jwt),
Expected1 = [Redis | Existing],
?assertEqual(Expected1, test_add_auth(OrderString, Redis, Existing)),
Expected2 = [Redis, Jwt | Existing],
?assertEqual(Expected2, test_add_auth(OrderString, Jwt, Expected1))
end}
].
make_hook(Priority, CallbackModule) when is_atom(CallbackModule) ->
#callback{priority = Priority, action = {CallbackModule, dummy, []}};
make_hook(Priority, F) ->
#callback{priority = Priority, action = F}.
test_add_acl(OrderString, NewHook, ExistingHooks) ->
Order = parse_auth_acl_hook_order(acl_order, OrderString),
add_callback(Order, NewHook, ExistingHooks).
test_add_auth(OrderString, NewHook, ExistingHooks) ->
Order = parse_auth_acl_hook_order(auth_order, OrderString),
add_callback(Order, NewHook, ExistingHooks).
-endif.

View File

@ -822,15 +822,18 @@ t_enrich_connack_caps(_) ->
wildcard_subscription => true
}
end),
AckProps = emqx_channel:enrich_connack_caps(#{}, channel()),
?assertMatch(#{'Retain-Available' := 1,
'Maximum-Packet-Size' := 1024,
'Topic-Alias-Maximum' := 10,
'Wildcard-Subscription-Available' := 1,
'Subscription-Identifier-Available' := 1,
'Shared-Subscription-Available' := 1
}, AckProps),
ok = meck:unload(emqx_mqtt_caps).
try
AckProps = emqx_channel:enrich_connack_caps(#{}, channel()),
?assertMatch(#{'Retain-Available' := 1,
'Maximum-Packet-Size' := 1024,
'Topic-Alias-Maximum' := 10,
'Wildcard-Subscription-Available' := 1,
'Subscription-Identifier-Available' := 1,
'Shared-Subscription-Available' := 1
}, AckProps)
after
ok = meck:unload(emqx_mqtt_caps)
end.
%%--------------------------------------------------------------------
%% Test cases for terminate

View File

@ -23,21 +23,13 @@
all() -> emqx_ct:all(?MODULE).
% t_lookup(_) ->
% error('TODO').
init_per_testcase(_Test, Config) ->
Config.
% t_run_fold(_) ->
% error('TODO').
end_per_testcase(_Test, _Config) ->
_ = (catch emqx_hooks:stop()),
_ = clear_orders().
% t_run(_) ->
% error('TODO').
% t_del(_) ->
% error('TODO').
% t_add(_) ->
% error('TODO').
t_add_del_hook(_) ->
{ok, _} = emqx_hooks:start_link(),
ok = emqx:hook(test_hook, fun ?MODULE:hook_fun1/1, []),
@ -107,6 +99,80 @@ t_uncovered_func(_) ->
Pid ! test,
ok = emqx_hooks:stop().
t_explicit_order_acl(_) ->
HookPoint = 'client.check_acl',
test_explicit_order(acl_order, HookPoint).
t_explicit_order_auth(_) ->
HookPoint = 'client.authenticate',
test_explicit_order(auth_order, HookPoint).
test_explicit_order(ConfigKey, HookPoint) ->
{ok, _} = emqx_hooks:start_link(),
ok = set_orders(ConfigKey, "mod_a, mod_b"),
ok = emqx:hook(HookPoint, {mod_c, cb, []}, 5),
ok = emqx:hook(HookPoint, {mod_d, cb, []}, 0),
ok = emqx:hook(HookPoint, {mod_b, cb, []}, 0),
ok = emqx:hook(HookPoint, {mod_a, cb, []}, -1),
ok = emqx:hook(HookPoint, {mod_e, cb, []}, -1),
?assertEqual(
[
{mod_a, cb, []},
{mod_b, cb, []},
{mod_c, cb, []},
{mod_d, cb, []},
{mod_e, cb, []}
],
get_hookpoint_callbacks(HookPoint)).
t_reorder_callbacks_acl(_) ->
F = fun emqx_hooks:reorder_acl_callbacks/0,
ok = emqx_hooks:reorder_auth_callbacks(),
test_reorder_callbacks(acl_order, 'client.check_acl', F).
t_reorder_callbacks_auth(_) ->
F = fun emqx_hooks:reorder_auth_callbacks/0,
test_reorder_callbacks(auth_order, 'client.authenticate', F).
test_reorder_callbacks(ConfigKey, HookPoint, ReorderFun) ->
{ok, _} = emqx_hooks:start_link(),
ok = set_orders(ConfigKey, "mod_a,mod_b,mod_c"),
ok = emqx:hook(HookPoint, fun mod_c:foo/1),
ok = emqx:hook(HookPoint, fun mod_a:foo/1),
ok = emqx:hook(HookPoint, fun mod_b:foo/1),
ok = emqx:hook(HookPoint, fun mod_y:foo/1),
ok = emqx:hook(HookPoint, fun mod_x:foo/1),
?assertEqual(
[fun mod_a:foo/1, fun mod_b:foo/1, fun mod_c:foo/1,
fun mod_y:foo/1, fun mod_x:foo/1
],
get_hookpoint_callbacks(HookPoint)),
ok = set_orders(ConfigKey, "mod_x,mod_a,mod_c,mod_b"),
ReorderFun(),
ignored = gen_server:call(emqx_hooks, x),
?assertEqual(
[fun mod_x:foo/1, fun mod_a:foo/1, fun mod_c:foo/1,
fun mod_b:foo/1, fun mod_y:foo/1
],
get_hookpoint_callbacks(HookPoint)),
ok.
%%--------------------------------------------------------------------
%% Helpers
%%--------------------------------------------------------------------
set_orders(Key, OrderString) ->
application:set_env(emqx, Key, OrderString).
clear_orders() ->
application:set_env(emqx, acl_order, "none").
get_hookpoint_callbacks(HookPoint) ->
[emqx_hooks:callback_action(C) || C <- emqx_hooks:lookup(HookPoint)].
%%--------------------------------------------------------------------
%% Hook fun
%%--------------------------------------------------------------------
@ -140,4 +206,3 @@ hook_filter2(_, _Acc, _IntArg) -> false.
hook_filter2_1(arg, _Acc, init_arg) -> true;
hook_filter2_1(arg1, _Acc, init_arg) -> true;
hook_filter2_1(_, _Acc, _IntArg) -> false.

View File

@ -52,13 +52,15 @@ t_check_sub(_) ->
wildcard_subscription => false
},
emqx_zone:set_env(zone, '$mqtt_sub_caps', SubCaps),
timer:sleep(50),
ClientInfo = #{zone => zone},
ok = emqx_mqtt_caps:check_sub(ClientInfo, <<"topic">>, SubOpts),
?assertEqual({error, ?RC_TOPIC_FILTER_INVALID},
emqx_mqtt_caps:check_sub(ClientInfo, <<"a/b/c/d">>, SubOpts)),
?assertEqual({error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED},
emqx_mqtt_caps:check_sub(ClientInfo, <<"+/#">>, SubOpts)),
?assertEqual({error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED},
emqx_mqtt_caps:check_sub(ClientInfo, <<"topic">>, SubOpts#{share => true})),
emqx_zone:unset_env(zone, '$mqtt_pub_caps').
try
ClientInfo = #{zone => zone},
ok = emqx_mqtt_caps:check_sub(ClientInfo, <<"topic">>, SubOpts),
?assertEqual({error, ?RC_TOPIC_FILTER_INVALID},
emqx_mqtt_caps:check_sub(ClientInfo, <<"a/b/c/d">>, SubOpts)),
?assertEqual({error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED},
emqx_mqtt_caps:check_sub(ClientInfo, <<"+/#">>, SubOpts)),
?assertEqual({error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED},
emqx_mqtt_caps:check_sub(ClientInfo, <<"topic">>, SubOpts#{share => true}))
after
emqx_zone:unset_env(zone, '$mqtt_pub_caps')
end.

View File

@ -468,8 +468,8 @@ t_connack_max_qos_allowed(_) ->
%% max_qos_allowed = 0
emqx_zone:set_env(external, max_qos_allowed, 0),
persistent_term:erase({emqx_zone, external, '$mqtt_caps'}),
persistent_term:erase({emqx_zone, external, '$mqtt_pub_caps'}),
emqx_zone:unset_env(external, '$mqtt_caps'),
emqx_zone:unset_env(external, '$mqtt_pub_caps'),
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
{ok, Connack1} = emqtt:connect(Client1),
@ -496,8 +496,8 @@ t_connack_max_qos_allowed(_) ->
%% max_qos_allowed = 1
emqx_zone:set_env(external, max_qos_allowed, 1),
persistent_term:erase({emqx_zone, external, '$mqtt_caps'}),
persistent_term:erase({emqx_zone, external, '$mqtt_pub_caps'}),
emqx_zone:unset_env(external, '$mqtt_caps'),
emqx_zone:unset_env(external, '$mqtt_pub_caps'),
{ok, Client3} = emqtt:start_link([{proto_ver, v5}]),
{ok, Connack3} = emqtt:connect(Client3),
@ -524,8 +524,8 @@ t_connack_max_qos_allowed(_) ->
%% max_qos_allowed = 2
emqx_zone:set_env(external, max_qos_allowed, 2),
persistent_term:erase({emqx_zone, external, '$mqtt_caps'}),
persistent_term:erase({emqx_zone, external, '$mqtt_pub_caps'}),
emqx_zone:unset_env(external, '$mqtt_caps'),
emqx_zone:unset_env(external, '$mqtt_pub_caps'),
{ok, Client5} = emqtt:start_link([{proto_ver, v5}]),
{ok, Connack5} = emqtt:connect(Client5),

View File

@ -32,6 +32,7 @@ init_per_suite(Config) ->
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:boot_modules(all),
emqx_ct_helpers:stop_apps([]).
init_per_testcase(_TestCase, Config) ->

View File

@ -51,7 +51,7 @@ init_per_suite(Config) ->
PortDiscovery = application:get_env(gen_rpc, port_discovery),
application:set_env(gen_rpc, port_discovery, stateless),
application:ensure_all_started(gen_rpc),
%% ensure emqx_moduels' app modules are loaded
%% ensure emqx_modules app modules are loaded
%% so the mnesia tables are created
ok = load_app(emqx_modules),
emqx_ct_helpers:start_apps([]),

View File

@ -32,25 +32,44 @@
all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) ->
emqx_ct_helpers:boot_modules(all),
emqx_ct_helpers:start_apps([]),
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]),
ok.
init_per_testcase(Case, Config) ->
?MODULE:Case({'init', Config}).
end_per_testcase(Case, Config) ->
?MODULE:Case({'end', Config}).
%%--------------------------------------------------------------------
%% Testcases
t_takeover(_) ->
process_flag(trap_exit, true),
t_takeover({init, Config}) when is_list(Config) ->
Config;
t_takeover({'end', Config}) when is_list(Config) ->
ok;
t_takeover(Config) when is_list(Config) ->
AllMsgs = messages(?CNT),
Pos = rand:uniform(?CNT),
ClientId = <<"clientid">>,
{ok, C1} = emqtt:start_link([{clientid, ClientId}, {clean_start, false}]),
{ok, _} = emqtt:connect(C1),
ClientId = random_clientid(),
ClientOpts = [{clientid, ClientId},
{clean_start, false},
{host, "127.0.0.1"},
{port, 1883}
],
C1 =
with_retry(
fun() ->
{ok, C} = emqtt:start_link(ClientOpts),
{ok, _} = emqtt:connect(C),
C
end, 5),
emqtt:subscribe(C1, <<"t">>, 1),
spawn(fun() ->
[begin
emqx:publish(lists:nth(I, AllMsgs)),
@ -59,31 +78,65 @@ t_takeover(_) ->
end),
emqtt:pause(C1),
timer:sleep(?CNT*10),
load_meck(ClientId),
spawn(fun() ->
[begin
emqx:publish(lists:nth(I, AllMsgs)),
timer:sleep(rand:uniform(10))
end || I <- lists:seq(Pos+1, ?CNT)]
end),
{ok, C2} = emqtt:start_link([{clientid, ClientId}, {clean_start, false}]),
{ok, _} = emqtt:connect(C2),
Received = all_received_publishs(),
ct:pal("middle: ~p, received: ~p", [Pos, [P || {publish, #{payload := P}} <- Received]]),
assert_messages_missed(AllMsgs, Received),
assert_messages_order(AllMsgs, Received),
emqtt:disconnect(C2),
unload_meck(ClientId).
t_takover_in_cluster(_) ->
todo.
try
spawn(fun() ->
[begin
emqx:publish(lists:nth(I, AllMsgs)),
timer:sleep(rand:uniform(10))
end || I <- lists:seq(Pos+1, ?CNT)]
end),
{ok, C2} = emqtt:start_link(ClientOpts),
%% C1 is going down, unlink it so the test can continue to run
_ = monitor(process, C1),
?assert(erlang:is_process_alive(C1)),
unlink(C1),
{ok, _} = emqtt:connect(C2),
receive
{'DOWN', _, process, C1, _} ->
ok
after 1000 ->
ct:fail("timedout_waiting_for_old_connection_shutdown")
end,
Received = all_received_publishs(),
ct:pal("middle: ~p, received: ~p", [Pos, [P || {publish, #{payload := P}} <- Received]]),
assert_messages_missed(AllMsgs, Received),
assert_messages_order(AllMsgs, Received),
kill_process(C2, fun emqtt:stop/1)
after
unload_meck(ClientId)
end.
%%--------------------------------------------------------------------
%% Helpers
random_clientid() ->
iolist_to_binary(["clientid", "-", integer_to_list(erlang:system_time())]).
kill_process(Pid, WithFun) ->
_ = unlink(Pid),
_ = monitor(process, Pid),
try WithFun(Pid)
catch _:_ -> ok
end,
receive
{'DOWN', _, process, Pid, _} ->
ok
after 10_000 ->
exit(Pid, kill),
error(timeout)
end.
with_retry(Fun, 1) -> Fun();
with_retry(Fun, N) when N > 1 ->
try
Fun()
catch
_ : _ ->
ct:sleep(1000),
with_retry(Fun, N - 1)
end.
load_meck(ClientId) ->
meck:new(fake_conn_mod, [non_strict]),
HookTakeover = fun(Pid, Msg = {takeover, 'begin'}) ->