Merge branch 'master' into release-50

This commit is contained in:
zhongwencool 2023-01-13 16:39:35 +08:00 committed by GitHub
commit 0049b4a294
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
110 changed files with 1006 additions and 928 deletions

View File

@ -1,81 +0,0 @@
name: 'Docker meta'
inputs:
profile:
required: true
type: string
registry:
required: true
type: string
arch:
required: true
type: string
otp:
required: true
type: string
elixir:
required: false
type: string
default: ''
builder_base:
required: true
type: string
owner:
required: true
type: string
docker_tags:
required: true
type: string
outputs:
emqx_name:
description: "EMQX name"
value: ${{ steps.pre-meta.outputs.emqx_name }}
version:
description: "docker image version"
value: ${{ steps.meta.outputs.version }}
tags:
description: "docker image tags"
value: ${{ steps.meta.outputs.tags }}
labels:
description: "docker image labels"
value: ${{ steps.meta.outputs.labels }}
runs:
using: composite
steps:
- name: prepare for docker/metadata-action
id: pre-meta
shell: bash
run: |
emqx_name=${{ inputs.profile }}
img_suffix=${{ inputs.arch }}
img_labels="org.opencontainers.image.otp.version=${{ inputs.otp }}"
if [ -n "${{ inputs.elixir }}" ]; then
emqx_name="emqx-elixir"
img_suffix="elixir-${{ inputs.arch }}"
img_labels="org.opencontainers.image.elixir.version=${{ inputs.elixir }}\n${img_labels}"
fi
if [ "${{ inputs.profile }}" = "emqx" ]; then
img_labels="org.opencontainers.image.edition=Opensource\n${img_labels}"
fi
if [ "${{ inputs.profile }}" = "emqx-enterprise" ]; then
img_labels="org.opencontainers.image.edition=Enterprise\n${img_labels}"
fi
if [[ "${{ inputs.builder_base }}" =~ "alpine" ]]; then
img_suffix="${img_suffix}-alpine"
fi
echo "emqx_name=${emqx_name}" >> $GITHUB_OUTPUT
echo "img_suffix=${img_suffix}" >> $GITHUB_OUTPUT
echo "img_labels=${img_labels}" >> $GITHUB_OUTPUT
echo "img_name=${{ inputs.registry }}/${{ inputs.owner }}/${{ inputs.profile }}" >> $GITHUB_OUTPUT
- uses: docker/metadata-action@v4
id: meta
with:
images:
${{ steps.pre-meta.outputs.img_name }}
flavor: |
suffix=-${{ steps.pre-meta.outputs.img_suffix }}
tags: |
type=raw,value=${{ inputs.docker_tags }}
labels:
${{ steps.pre-meta.outputs.img_labels }}

View File

@ -9,15 +9,17 @@ on:
tags: tags:
- v* - v*
- e* - e*
release: - docker-latest-*
types:
- published
workflow_dispatch: workflow_dispatch:
inputs: inputs:
branch_or_tag: branch_or_tag:
required: false required: false
profile: profile:
required: false required: false
default: 'emqx'
is_latest:
required: false
default: false
jobs: jobs:
prepare: prepare:
@ -26,10 +28,11 @@ jobs:
container: "ghcr.io/emqx/emqx-builder/5.0-26:1.13.4-24.3.4.2-1-ubuntu20.04" container: "ghcr.io/emqx/emqx-builder/5.0-26:1.13.4-24.3.4.2-1-ubuntu20.04"
outputs: outputs:
BUILD_PROFILE: ${{ steps.get_profile.outputs.BUILD_PROFILE }} PROFILE: ${{ steps.get_profile.outputs.PROFILE }}
IS_DOCKER_LATEST: ${{ steps.get_profile.outputs.IS_DOCKER_LATEST }} EDITION: ${{ steps.get_profile.outputs.EDITION }}
IS_LATEST: ${{ steps.get_profile.outputs.IS_LATEST }}
IS_EXACT_TAG: ${{ steps.get_profile.outputs.IS_EXACT_TAG }} IS_EXACT_TAG: ${{ steps.get_profile.outputs.IS_EXACT_TAG }}
DOCKER_TAG_VERSION: ${{ steps.get_profile.outputs.DOCKER_TAG_VERSION }} VERSION: ${{ steps.get_profile.outputs.VERSION }}
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v3
@ -45,14 +48,14 @@ jobs:
tag=${{ github.ref }} tag=${{ github.ref }}
# tag docker-latest-ce or docker-latest-ee # tag docker-latest-ce or docker-latest-ee
if git describe --tags --exact --match 'docker-latest-*' 2>/dev/null; then if git describe --tags --exact --match 'docker-latest-*' 2>/dev/null; then
echo 'docker_latest=true due to docker-latest-* tag' echo 'is_latest=true due to docker-latest-* tag'
docker_latest=true is_latest=true
elif [ "${{ github.event_name }}" = "release" ]; then elif [ "${{ inputs.is_latest }}" = "true" ]; then
echo 'docker_latest=true due to release' echo 'is_latest=true due to manual input from workflow_dispatch'
docker_latest=true is_latest=true
else else
echo 'docker_latest=false' echo 'is_latest=false'
docker_latest=false is_latest=false
fi fi
if git describe --tags --match "[v|e]*" --exact; then if git describe --tags --match "[v|e]*" --exact; then
echo "This is an exact git tag, will publish images" echo "This is an exact git tag, will publish images"
@ -64,18 +67,20 @@ jobs:
case $tag in case $tag in
refs/tags/v*) refs/tags/v*)
PROFILE='emqx' PROFILE='emqx'
EDITION='Opensource'
;; ;;
refs/tags/e*) refs/tags/e*)
PROFILE=emqx-enterprise PROFILE=emqx-enterprise
EDITION='Enterprise'
;; ;;
*) *)
PROFILE=${{ github.event.inputs.profile }} PROFILE=${{ github.event.inputs.profile }}
case "$PROFILE" in case "$PROFILE" in
emqx) emqx)
true EDITION='Opensource'
;; ;;
emqx-enterprise) emqx-enterprise)
true EDITION='Enterprise'
;; ;;
*) *)
echo "ERROR: Failed to resolve build profile" echo "ERROR: Failed to resolve build profile"
@ -85,14 +90,18 @@ jobs:
;; ;;
esac esac
VSN="$(./pkg-vsn.sh "$PROFILE")" VSN="$(./pkg-vsn.sh "$PROFILE")"
echo "Building $PROFILE image with tag $VSN (latest=$docker_latest)" echo "Building emqx/$PROFILE:$VSN image (latest=$is_latest)"
echo "IS_DOCKER_LATEST=$docker_latest" >> $GITHUB_OUTPUT echo "Push = $is_exact"
echo "IS_LATEST=$is_latest" >> $GITHUB_OUTPUT
echo "IS_EXACT_TAG=$is_exact" >> $GITHUB_OUTPUT echo "IS_EXACT_TAG=$is_exact" >> $GITHUB_OUTPUT
echo "BUILD_PROFILE=$PROFILE" >> $GITHUB_OUTPUT echo "PROFILE=$PROFILE" >> $GITHUB_OUTPUT
echo "DOCKER_TAG_VERSION=$VSN" >> $GITHUB_OUTPUT echo "EDITION=$EDITION" >> $GITHUB_OUTPUT
echo "VERSION=$VSN" >> $GITHUB_OUTPUT
- name: get_all_deps - name: get_all_deps
env:
PROFILE: ${{ steps.get_profile.outputs.PROFILE }}
run: | run: |
make -C source deps-all PROFILE=$PROFILE make -C source deps-$PROFILE
zip -ryq source.zip source/* source/.[^.]* zip -ryq source.zip source/* source/.[^.]*
- uses: actions/upload-artifact@v3 - uses: actions/upload-artifact@v3
with: with:
@ -100,17 +109,17 @@ jobs:
path: source.zip path: source.zip
docker: docker:
runs-on: ${{ matrix.arch[1] }} runs-on: ubuntu-20.04
needs: prepare needs: prepare
strategy: strategy:
fail-fast: false fail-fast: false
matrix: matrix:
arch:
- [amd64, ubuntu-20.04]
- [arm64, aws-arm64]
profile: profile:
- ${{ needs.prepare.outputs.BUILD_PROFILE }} - "${{ needs.prepare.outputs.PROFILE }}"
flavor:
- ''
- '-elixir'
registry: registry:
- 'docker.io' - 'docker.io'
- 'public.ecr.aws' - 'public.ecr.aws'
@ -128,9 +137,10 @@ jobs:
exclude: # TODO: publish enterprise to ecr too? exclude: # TODO: publish enterprise to ecr too?
- registry: 'public.ecr.aws' - registry: 'public.ecr.aws'
profile: emqx-enterprise profile: emqx-enterprise
- flavor: '-elixir'
os: [alpine3.15.1, "alpine:3.15.1", "deploy/docker/Dockerfile.alpine"]
steps: steps:
- uses: AutoModality/action-clean@v1
if: matrix.arch[1] == 'aws-arm64'
- uses: actions/download-artifact@v3 - uses: actions/download-artifact@v3
with: with:
name: source name: source
@ -138,16 +148,17 @@ jobs:
- name: unzip source code - name: unzip source code
run: unzip -q source.zip run: unzip -q source.zip
- uses: docker/setup-qemu-action@v2
- uses: docker/setup-buildx-action@v2 - uses: docker/setup-buildx-action@v2
- name: Login for docker. - name: Login to hub.docker.com
uses: docker/login-action@v2 uses: docker/login-action@v2
if: matrix.registry == 'docker.io' if: matrix.registry == 'docker.io'
with: with:
username: ${{ secrets.DOCKER_HUB_USER }} username: ${{ secrets.DOCKER_HUB_USER }}
password: ${{ secrets.DOCKER_HUB_TOKEN }} password: ${{ secrets.DOCKER_HUB_TOKEN }}
- name: Login for AWS ECR - name: Login to AWS ECR
uses: docker/login-action@v2 uses: docker/login-action@v2
if: matrix.registry == 'public.ecr.aws' if: matrix.registry == 'public.ecr.aws'
with: with:
@ -155,230 +166,48 @@ jobs:
username: ${{ secrets.AWS_ACCESS_KEY_ID }} username: ${{ secrets.AWS_ACCESS_KEY_ID }}
password: ${{ secrets.AWS_SECRET_ACCESS_KEY }} password: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
ecr: true ecr: true
- name: prepare for docker/metadata-action
id: pre-meta
shell: bash
run: |
extra_labels=
img_suffix=
flavor="${{ matrix.flavor }}"
if [ "${{ matrix.flavor }}" = '-elixir' ]; then
img_suffix="-elixir"
extra_labels="org.opencontainers.image.elixir.version=${{ matrix.elixir }}"
fi
if [[ "${{ matrix.os[0] }}" =~ "alpine" ]]; then
img_suffix="${img_suffix}-alpine"
fi
- uses: ./source/.github/actions/docker-meta echo "img_suffix=$img_suffix" >> $GITHUB_OUTPUT
echo "extra_labels=$extra_labels" >> $GITHUB_OUTPUT
- uses: docker/metadata-action@v4
id: meta id: meta
with: with:
profile: ${{ matrix.profile }} images: |
registry: ${{ matrix.registry }} ${{ matrix.registry }}/${{ github.repository_owner }}/${{ matrix.profile }}
arch: ${{ matrix.arch[0] }} flavor: |
otp: ${{ matrix.otp }} suffix=${{ steps.pre-meta.outputs.img_suffix }}
builder_base: ${{ matrix.os[0] }} tags: |
owner: ${{ github.repository_owner }} type=raw,value=${{ needs.prepare.outputs.VERSION }}
docker_tags: ${{ needs.prepare.outputs.DOCKER_TAG_VERSION }} type=raw,value=latest,enable=${{ needs.prepare.outputs.IS_LATEST }}
labels: |
org.opencontainers.image.otp.version=${{ matrix.otp }}
org.opencontainers.image.edition=${{ needs.prepare.outputs.EDITION }}
${{ steps.pre-meta.outputs.extra_labels }}
- uses: docker/build-push-action@v3 - uses: docker/build-push-action@v3
with: with:
push: ${{ needs.prepare.outputs.IS_EXACT_TAG == 'true' || github.repository_owner != 'emqx' }} push: ${{ needs.prepare.outputs.IS_EXACT_TAG == 'true' || github.repository_owner != 'emqx' }}
pull: true pull: true
no-cache: true no-cache: true
platforms: linux/${{ matrix.arch[0] }} platforms: linux/amd64,linux/arm64
tags: ${{ steps.meta.outputs.tags }} tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }} labels: ${{ steps.meta.outputs.labels }}
build-args: | build-args: |
BUILD_FROM=ghcr.io/emqx/emqx-builder/${{ matrix.builder }}:${{ matrix.elixir }}-${{ matrix.otp }}-${{ matrix.os[0] }} EMQX_NAME=${{ matrix.profile }}${{ matrix.flavor }}
RUN_FROM=${{ matrix.os[1] }}
EMQX_NAME=${{ steps.meta.outputs.emqx_name }}
file: source/${{ matrix.os[2] }} file: source/${{ matrix.os[2] }}
context: source context: source
- name: Docker Hub Description
if: matrix.registry == 'docker.io'
uses: peter-evans/dockerhub-description@v3
with:
username: ${{ secrets.DOCKER_HUB_USER }}
password: ${{ secrets.DOCKER_HUB_TOKEN }}
repository: "emqx/${{ needs.prepare.outputs.BUILD_PROFILE }}"
readme-filepath: ./source/deploy/docker/README.md
short-description: "The most scalable open-source MQTT broker for IoT, IIoT, connected vehicles, and more."
docker-elixir:
runs-on: ${{ matrix.arch[1] }}
needs: prepare
# do not build elixir images for ee for now
if: needs.prepare.outputs.BUILD_PROFILE == 'emqx'
strategy:
fail-fast: false
matrix:
arch:
- [amd64, ubuntu-20.04]
- [arm64, aws-arm64]
profile:
- ${{ needs.prepare.outputs.BUILD_PROFILE }}
registry:
- 'docker.io'
os:
- [debian11, "debian:11-slim", "deploy/docker/Dockerfile"]
builder:
- 5.0-26 # update to latest
otp:
- 25.1.2-2 # update to latest
elixir:
- 1.13.4 # update to latest
steps:
- uses: AutoModality/action-clean@v1
if: matrix.arch[1] == 'aws-arm64'
- uses: actions/download-artifact@v3
with:
name: source
path: .
- name: unzip source code
run: unzip -q source.zip
- uses: docker/setup-buildx-action@v2
- name: Login for docker.
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKER_HUB_USER }}
password: ${{ secrets.DOCKER_HUB_TOKEN }}
- uses: ./source/.github/actions/docker-meta
id: meta
with:
profile: ${{ matrix.profile }}
registry: ${{ matrix.registry }}
arch: ${{ matrix.arch[0] }}
otp: ${{ matrix.otp }}
elixir: ${{ matrix.elixir }}
builder_base: ${{ matrix.os[0] }}
owner: ${{ github.repository_owner }}
docker_tags: ${{ needs.prepare.outputs.DOCKER_TAG_VERSION }}
- uses: docker/build-push-action@v3
with:
push: ${{ needs.prepare.outputs.IS_EXACT_TAG == 'true' || github.repository_owner != 'emqx' }}
pull: true
no-cache: true
platforms: linux/${{ matrix.arch[0] }}
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
build-args: |
BUILD_FROM=ghcr.io/emqx/emqx-builder/${{ matrix.builder }}:${{ matrix.elixir }}-${{ matrix.otp }}-${{ matrix.os[0] }}
RUN_FROM=${{ matrix.os[1] }}
EMQX_NAME=${{ steps.meta.outputs.emqx_name }}
file: source/${{ matrix.os[2] }}
context: source
docker-push-multi-arch-manifest:
# note, we only run on amd64
if: needs.prepare.outputs.IS_EXACT_TAG
needs:
- prepare
- docker
runs-on: ${{ matrix.arch[1] }}
strategy:
fail-fast: false
matrix:
arch:
- [amd64, ubuntu-20.04]
profile:
- ${{ needs.prepare.outputs.BUILD_PROFILE }}
os:
- [alpine3.15.1, "alpine:3.15.1", "deploy/docker/Dockerfile.alpine"]
- [debian11, "debian:11-slim", "deploy/docker/Dockerfile"]
# NOTE: only support latest otp version, not a matrix
otp:
- 24.3.4.2-1 # switch to 25 once ready to release 5.1
registry:
- 'docker.io'
- 'public.ecr.aws'
exclude:
- registry: 'public.ecr.aws'
profile: emqx-enterprise
steps:
- uses: actions/download-artifact@v3
with:
name: source
path: .
- name: unzip source code
run: unzip -q source.zip
- uses: docker/login-action@v2
if: matrix.registry == 'docker.io'
with:
username: ${{ secrets.DOCKER_HUB_USER }}
password: ${{ secrets.DOCKER_HUB_TOKEN }}
- uses: docker/login-action@v2
if: matrix.registry == 'public.ecr.aws'
with:
registry: public.ecr.aws
username: ${{ secrets.AWS_ACCESS_KEY_ID }}
password: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
ecr: true
- uses: ./source/.github/actions/docker-meta
id: meta
with:
profile: ${{ matrix.profile }}
registry: ${{ matrix.registry }}
arch: ${{ matrix.arch[0] }}
otp: ${{ matrix.otp }}
builder_base: ${{ matrix.os[0] }}
owner: ${{ github.repository_owner }}
docker_tags: ${{ needs.prepare.outputs.DOCKER_TAG_VERSION }}
- name: update manifest for multiarch image
working-directory: source
run: |
is_latest="${{ needs.prepare.outputs.IS_DOCKER_LATEST }}"
scripts/docker-create-push-manifests.sh "${{ steps.meta.outputs.tags }}" "$is_latest"
docker-elixir-push-multi-arch-manifest:
# note, we only run on amd64
# do not build enterprise elixir images for now
if: needs.prepare.outputs.IS_EXACT_TAG == 'true' && needs.prepare.outputs.BUILD_PROFILE == 'emqx'
needs:
- prepare
- docker-elixir
runs-on: ${{ matrix.arch[1] }}
strategy:
fail-fast: false
matrix:
arch:
- [amd64, ubuntu-20.04]
profile:
- ${{ needs.prepare.outputs.BUILD_PROFILE }}
# NOTE: for docker, only support latest otp version, not a matrix
otp:
- 25.1.2-2 # update to latest
elixir:
- 1.13.4 # update to latest
registry:
- 'docker.io'
steps:
- uses: actions/download-artifact@v3
with:
name: source
path: .
- name: unzip source code
run: unzip -q source.zip
- uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKER_HUB_USER }}
password: ${{ secrets.DOCKER_HUB_TOKEN }}
- uses: ./source/.github/actions/docker-meta
id: meta
with:
profile: ${{ matrix.profile }}
registry: ${{ matrix.registry }}
arch: ${{ matrix.arch[0] }}
otp: ${{ matrix.otp }}
elixir: ${{ matrix.elixir }}
builder_base: ${{ matrix.os[0] }}
owner: ${{ github.repository_owner }}
docker_tags: ${{ needs.prepare.outputs.DOCKER_TAG_VERSION }}
- name: update manifest for multiarch image
working-directory: source
run: |
scripts/docker-create-push-manifests.sh "${{ steps.meta.outputs.tags }}" false

View File

@ -201,12 +201,25 @@ jobs:
echo "waiting emqx started"; echo "waiting emqx started";
sleep 10; sleep 10;
done done
- name: Get Token
timeout-minutes: 1
run: |
kubectl port-forward service/${{ matrix.profile }} 18083:18083 > /dev/null &
while
[ "$(curl --silent -X 'GET' 'http://127.0.0.1:18083/api/v5/status' | tail -n1)" != "emqx is running" ]
do
echo "waiting emqx"
sleep 1
done
echo "TOKEN=$(curl --silent -X 'POST' 'http://127.0.0.1:18083/api/v5/login' -H 'accept: application/json' -H 'Content-Type: application/json' -d '{"username": "admin","password": "public"}' | jq -r ".token")" >> $GITHUB_ENV
- name: Check cluster - name: Check cluster
timeout-minutes: 10 timeout-minutes: 10
run: | run: |
kubectl port-forward service/${{ matrix.profile }} 18083:18083 > /dev/null &
while while
[ "$(curl --silent --basic -u admin:public -X GET http://127.0.0.1:18083/api/v5/cluster| jq '.nodes|length')" != "3" ]; [ "$(curl --silent -H "Authorization: Bearer $TOKEN" -X GET http://127.0.0.1:18083/api/v5/cluster| jq '.nodes|length')" != "3" ];
do do
echo "waiting ${{ matrix.profile }} cluster scale" echo "waiting ${{ matrix.profile }} cluster scale"
sleep 1 sleep 1

View File

@ -92,7 +92,7 @@ jobs:
- uses: actions/checkout@v3 - uses: actions/checkout@v3
with: with:
repository: emqx/emqx-fvt repository: emqx/emqx-fvt
ref: broker-autotest-v2 ref: broker-autotest-v4
path: scripts path: scripts
- uses: actions/setup-java@v3 - uses: actions/setup-java@v3
with: with:
@ -191,7 +191,7 @@ jobs:
- uses: actions/checkout@v3 - uses: actions/checkout@v3
with: with:
repository: emqx/emqx-fvt repository: emqx/emqx-fvt
ref: broker-autotest-v2 ref: broker-autotest-v4
path: scripts path: scripts
- uses: actions/setup-java@v3 - uses: actions/setup-java@v3
with: with:
@ -297,7 +297,7 @@ jobs:
- uses: actions/checkout@v3 - uses: actions/checkout@v3
with: with:
repository: emqx/emqx-fvt repository: emqx/emqx-fvt
ref: broker-autotest-v2 ref: broker-autotest-v4
path: scripts path: scripts
- uses: actions/setup-java@v3 - uses: actions/setup-java@v3
with: with:
@ -396,7 +396,7 @@ jobs:
- uses: actions/checkout@v3 - uses: actions/checkout@v3
with: with:
repository: emqx/emqx-fvt repository: emqx/emqx-fvt
ref: broker-autotest-v2 ref: broker-autotest-v4
path: scripts path: scripts
- name: run jwks_server - name: run jwks_server
timeout-minutes: 10 timeout-minutes: 10
@ -496,7 +496,7 @@ jobs:
- uses: actions/checkout@v3 - uses: actions/checkout@v3
with: with:
repository: emqx/emqx-fvt repository: emqx/emqx-fvt
ref: broker-autotest-v2 ref: broker-autotest-v4
path: scripts path: scripts
- uses: actions/setup-java@v3 - uses: actions/setup-java@v3
with: with:

View File

@ -7,7 +7,7 @@ export EMQX_DEFAULT_RUNNER = debian:11-slim
export OTP_VSN ?= $(shell $(CURDIR)/scripts/get-otp-vsn.sh) export OTP_VSN ?= $(shell $(CURDIR)/scripts/get-otp-vsn.sh)
export ELIXIR_VSN ?= $(shell $(CURDIR)/scripts/get-elixir-vsn.sh) export ELIXIR_VSN ?= $(shell $(CURDIR)/scripts/get-elixir-vsn.sh)
export EMQX_DASHBOARD_VERSION ?= v1.1.5 export EMQX_DASHBOARD_VERSION ?= v1.1.5
export EMQX_EE_DASHBOARD_VERSION ?= e1.0.1-beta.9 export EMQX_EE_DASHBOARD_VERSION ?= e1.0.1-beta.12
export EMQX_REL_FORM ?= tgz export EMQX_REL_FORM ?= tgz
export QUICER_DOWNLOAD_FROM_RELEASE = 1 export QUICER_DOWNLOAD_FROM_RELEASE = 1
ifeq ($(OS),Windows_NT) ifeq ($(OS),Windows_NT)

View File

@ -2050,7 +2050,7 @@ base_listener_enable_authn {
Set <code>true</code> (default) to enable client authentication on this listener, the authentication Set <code>true</code> (default) to enable client authentication on this listener, the authentication
process goes through the configured authentication chain. process goes through the configured authentication chain.
When set to <code>false</code> to allow any clients with or without authentication information such as username or password to log in. When set to <code>false</code> to allow any clients with or without authentication information such as username or password to log in.
When set to <code>quick_deny_anonymous<code>, it behaves like when set to <code>true</code> but clients will be When set to <code>quick_deny_anonymous</code>, it behaves like when set to <code>true</code>, but clients will be
denied immediately without going through any authenticators if <code>username</code> is not provided. This is useful to fence off denied immediately without going through any authenticators if <code>username</code> is not provided. This is useful to fence off
anonymous clients early. anonymous clients early.
""" """

View File

@ -15,10 +15,8 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% HTTP API Auth %% HTTP API Auth
-define(WRONG_USERNAME_OR_PWD, 'WRONG_USERNAME_OR_PWD'). -define(BAD_USERNAME_OR_PWD, 'BAD_USERNAME_OR_PWD').
-define(WRONG_USERNAME_OR_PWD_OR_API_KEY_OR_API_SECRET, -define(BAD_API_KEY_OR_SECRET, 'BAD_API_KEY_OR_SECRET').
'WRONG_USERNAME_OR_PWD_OR_API_KEY_OR_API_SECRET'
).
%% Bad Request %% Bad Request
-define(BAD_REQUEST, 'BAD_REQUEST'). -define(BAD_REQUEST, 'BAD_REQUEST').
@ -57,8 +55,8 @@
%% All codes %% All codes
-define(ERROR_CODES, [ -define(ERROR_CODES, [
{'WRONG_USERNAME_OR_PWD', <<"Wrong username or pwd">>}, {?BAD_USERNAME_OR_PWD, <<"Bad username or password">>},
{'WRONG_USERNAME_OR_PWD_OR_API_KEY_OR_API_SECRET', <<"Wrong username & pwd or key & secret">>}, {?BAD_API_KEY_OR_SECRET, <<"Bad API key or secret">>},
{'BAD_REQUEST', <<"Request parameters are not legal">>}, {'BAD_REQUEST', <<"Request parameters are not legal">>},
{'NOT_MATCH', <<"Conditions are not matched">>}, {'NOT_MATCH', <<"Conditions are not matched">>},
{'ALREADY_EXISTS', <<"Resource already existed">>}, {'ALREADY_EXISTS', <<"Resource already existed">>},

View File

@ -21,7 +21,8 @@
format_path/1, format_path/1,
check/2, check/2,
format_error/1, format_error/1,
format_error/2 format_error/2,
make_schema/1
]). ]).
%% @doc Format hocon config field path to dot-separated string in iolist format. %% @doc Format hocon config field path to dot-separated string in iolist format.
@ -79,6 +80,9 @@ format_error({_Schema, [#{kind := K} = First | Rest] = All}, Opts) when
format_error(_Other, _) -> format_error(_Other, _) ->
false. false.
make_schema(Fields) ->
#{roots => Fields, fields => #{}}.
%% Ensure iolist() %% Ensure iolist()
iol(B) when is_binary(B) -> B; iol(B) when is_binary(B) -> B;
iol(A) when is_atom(A) -> atom_to_binary(A, utf8); iol(A) when is_atom(A) -> atom_to_binary(A, utf8);

View File

@ -111,15 +111,19 @@
comma_separated_atoms/0 comma_separated_atoms/0
]). ]).
-export([namespace/0, roots/0, roots/1, fields/1, desc/1]). -export([namespace/0, roots/0, roots/1, fields/1, desc/1, tags/0]).
-export([conf_get/2, conf_get/3, keys/2, filter/1]). -export([conf_get/2, conf_get/3, keys/2, filter/1]).
-export([server_ssl_opts_schema/2, client_ssl_opts_schema/1, ciphers_schema/1]). -export([server_ssl_opts_schema/2, client_ssl_opts_schema/1, ciphers_schema/1]).
-export([authz_fields/0]).
-export([sc/2, map/2]). -export([sc/2, map/2]).
-elvis([{elvis_style, god_modules, disable}]). -elvis([{elvis_style, god_modules, disable}]).
namespace() -> broker. namespace() -> broker.
tags() ->
[<<"EMQX">>].
roots() -> roots() ->
%% TODO change config importance to a field metadata %% TODO change config importance to a field metadata
roots(high) ++ roots(medium) ++ roots(low). roots(high) ++ roots(medium) ++ roots(low).
@ -323,31 +327,7 @@ fields("stats") ->
)} )}
]; ];
fields("authorization") -> fields("authorization") ->
[ authz_fields();
{"no_match",
sc(
hoconsc:enum([allow, deny]),
#{
default => allow,
required => true,
desc => ?DESC(fields_authorization_no_match)
}
)},
{"deny_action",
sc(
hoconsc:enum([ignore, disconnect]),
#{
default => ignore,
required => true,
desc => ?DESC(fields_authorization_deny_action)
}
)},
{"cache",
sc(
ref(?MODULE, "cache"),
#{}
)}
];
fields("cache") -> fields("cache") ->
[ [
{"enable", {"enable",
@ -2088,6 +2068,33 @@ do_default_ciphers(_) ->
%% otherwise resolve default ciphers list at runtime %% otherwise resolve default ciphers list at runtime
[]. [].
authz_fields() ->
[
{"no_match",
sc(
hoconsc:enum([allow, deny]),
#{
default => allow,
required => true,
desc => ?DESC(fields_authorization_no_match)
}
)},
{"deny_action",
sc(
hoconsc:enum([ignore, disconnect]),
#{
default => ignore,
required => true,
desc => ?DESC(fields_authorization_deny_action)
}
)},
{"cache",
sc(
ref(?MODULE, "cache"),
#{}
)}
].
%% @private return a list of keys in a parent field %% @private return a list of keys in a parent field
-spec keys(string(), hocon:config()) -> [string()]. -spec keys(string(), hocon:config()) -> [string()].
keys(Parent, Conf) -> keys(Parent, Conf) ->
@ -2342,7 +2349,7 @@ authentication(Which) ->
undefined -> hoconsc:array(typerefl:map()); undefined -> hoconsc:array(typerefl:map());
Module -> Module:root_type() Module -> Module:root_type()
end, end,
%% It is a lazy type because when handing runtime update requests %% It is a lazy type because when handling runtime update requests
%% the config is not checked by emqx_schema, but by the injected schema %% the config is not checked by emqx_schema, but by the injected schema
Type = hoconsc:lazy(Type0), Type = hoconsc:lazy(Type0),
#{ #{

View File

@ -29,6 +29,9 @@
auth_header/2 auth_header/2
]). ]).
-define(DEFAULT_APP_ID, <<"default_appid">>).
-define(DEFAULT_APP_SECRET, <<"default_app_secret">>).
request_api(Method, Url, Auth) -> request_api(Method, Url, Auth) ->
request_api(Method, Url, [], Auth, []). request_api(Method, Url, [], Auth, []).
@ -74,12 +77,18 @@ auth_header(User, Pass) ->
{"Authorization", "Basic " ++ Encoded}. {"Authorization", "Basic " ++ Encoded}.
default_auth_header() -> default_auth_header() ->
AppId = <<"myappid">>, {ok, #{api_key := APIKey}} = emqx_mgmt_auth:read(?DEFAULT_APP_ID),
AppSecret = emqx_mgmt_auth:get_appsecret(AppId), auth_header(
auth_header(erlang:binary_to_list(AppId), erlang:binary_to_list(AppSecret)). erlang:binary_to_list(APIKey), erlang:binary_to_list(?DEFAULT_APP_SECRET)
).
create_default_app() -> create_default_app() ->
emqx_mgmt_auth:add_app(<<"myappid">>, <<"test">>). Now = erlang:system_time(second),
ExpiredAt = Now + timer:minutes(10),
emqx_mgmt_auth:create(
?DEFAULT_APP_ID, ?DEFAULT_APP_SECRET, true, ExpiredAt, <<"default app key for test">>
),
ok.
delete_default_app() -> delete_default_app() ->
emqx_mgmt_auth:del_app(<<"myappid">>). emqx_mgmt_auth:delete(?DEFAULT_APP_ID).

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_authn, [ {application, emqx_authn, [
{description, "EMQX Authentication"}, {description, "EMQX Authentication"},
{vsn, "0.1.11"}, {vsn, "0.1.12"},
{modules, []}, {modules, []},
{registered, [emqx_authn_sup, emqx_authn_registry]}, {registered, [emqx_authn_sup, emqx_authn_registry]},
{applications, [kernel, stdlib, emqx_resource, emqx_connector, ehttpc, epgsql, mysql, jose]}, {applications, [kernel, stdlib, emqx_resource, emqx_connector, ehttpc, epgsql, mysql, jose]},

View File

@ -22,6 +22,7 @@
-export([ -export([
common_fields/0, common_fields/0,
roots/0, roots/0,
tags/0,
fields/1, fields/1,
authenticator_type/0, authenticator_type/0,
authenticator_type_without_scram/0, authenticator_type_without_scram/0,
@ -32,6 +33,9 @@
roots() -> []. roots() -> [].
tags() ->
[<<"Authentication">>].
common_fields() -> common_fields() ->
[{enable, fun enable/1}]. [{enable, fun enable/1}].

View File

@ -25,6 +25,7 @@
-export([ -export([
namespace/0, namespace/0,
tags/0,
roots/0, roots/0,
fields/1, fields/1,
desc/1 desc/1
@ -105,6 +106,9 @@ mnesia(boot) ->
namespace() -> "authn-scram-builtin_db". namespace() -> "authn-scram-builtin_db".
tags() ->
[<<"Authentication">>].
roots() -> [?CONF_NS]. roots() -> [?CONF_NS].
fields(?CONF_NS) -> fields(?CONF_NS) ->

View File

@ -26,6 +26,7 @@
-export([ -export([
namespace/0, namespace/0,
tags/0,
roots/0, roots/0,
fields/1, fields/1,
desc/1, desc/1,
@ -51,6 +52,9 @@
namespace() -> "authn-http". namespace() -> "authn-http".
tags() ->
[<<"Authentication">>].
roots() -> roots() ->
[ [
{?CONF_NS, {?CONF_NS,

View File

@ -25,6 +25,7 @@
-export([ -export([
namespace/0, namespace/0,
tags/0,
roots/0, roots/0,
fields/1, fields/1,
desc/1 desc/1
@ -44,6 +45,9 @@
namespace() -> "authn-jwt". namespace() -> "authn-jwt".
tags() ->
[<<"Authentication">>].
roots() -> roots() ->
[ [
{?CONF_NS, {?CONF_NS,

View File

@ -26,6 +26,7 @@
-export([ -export([
namespace/0, namespace/0,
tags/0,
roots/0, roots/0,
fields/1, fields/1,
desc/1 desc/1
@ -107,6 +108,9 @@ mnesia(boot) ->
namespace() -> "authn-builtin_db". namespace() -> "authn-builtin_db".
tags() ->
[<<"Authentication">>].
roots() -> [?CONF_NS]. roots() -> [?CONF_NS].
fields(?CONF_NS) -> fields(?CONF_NS) ->

View File

@ -25,6 +25,7 @@
-export([ -export([
namespace/0, namespace/0,
tags/0,
roots/0, roots/0,
fields/1, fields/1,
desc/1 desc/1
@ -44,6 +45,9 @@
namespace() -> "authn-mongodb". namespace() -> "authn-mongodb".
tags() ->
[<<"Authentication">>].
roots() -> roots() ->
[ [
{?CONF_NS, {?CONF_NS,

View File

@ -27,6 +27,7 @@
-export([ -export([
namespace/0, namespace/0,
tags/0,
roots/0, roots/0,
fields/1, fields/1,
desc/1 desc/1
@ -46,6 +47,9 @@
namespace() -> "authn-mysql". namespace() -> "authn-mysql".
tags() ->
[<<"Authentication">>].
roots() -> [?CONF_NS]. roots() -> [?CONF_NS].
fields(?CONF_NS) -> fields(?CONF_NS) ->

View File

@ -26,6 +26,7 @@
-export([ -export([
namespace/0, namespace/0,
tags/0,
roots/0, roots/0,
fields/1, fields/1,
desc/1 desc/1
@ -50,6 +51,9 @@
namespace() -> "authn-postgresql". namespace() -> "authn-postgresql".
tags() ->
[<<"Authentication">>].
roots() -> [?CONF_NS]. roots() -> [?CONF_NS].
fields(?CONF_NS) -> fields(?CONF_NS) ->

View File

@ -25,6 +25,7 @@
-export([ -export([
namespace/0, namespace/0,
tags/0,
roots/0, roots/0,
fields/1, fields/1,
desc/1 desc/1
@ -44,6 +45,9 @@
namespace() -> "authn-redis". namespace() -> "authn-redis".
tags() ->
[<<"Authentication">>].
roots() -> roots() ->
[ [
{?CONF_NS, {?CONF_NS,

View File

@ -18,7 +18,8 @@
-compile(nowarn_export_all). -compile(nowarn_export_all).
-compile(export_all). -compile(export_all).
-import(emqx_dashboard_api_test_helpers, [request/3, uri/1, multipart_formdata_request/3]). -import(emqx_dashboard_api_test_helpers, [multipart_formdata_request/3]).
-import(emqx_mgmt_api_test_util, [request/3, uri/1]).
-include("emqx_authn.hrl"). -include("emqx_authn.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
@ -65,9 +66,8 @@ end_per_testcase(_, Config) ->
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_config:erase(?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_BINARY), emqx_config:erase(?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_BINARY),
_ = application:load(emqx_conf), _ = application:load(emqx_conf),
ok = emqx_common_test_helpers:start_apps( ok = emqx_mgmt_api_test_util:init_suite(
[emqx_authn, emqx_dashboard], [emqx_authn]
fun set_special_configs/1
), ),
?AUTHN:delete_chain(?GLOBAL), ?AUTHN:delete_chain(?GLOBAL),
@ -76,12 +76,7 @@ init_per_suite(Config) ->
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([emqx_dashboard, emqx_authn]), emqx_mgmt_api_test_util:end_suite([emqx_authn]),
ok.
set_special_configs(emqx_dashboard) ->
emqx_dashboard_api_test_helpers:set_default_config();
set_special_configs(_App) ->
ok. ok.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------

View File

@ -197,7 +197,7 @@ t_list_users(_) ->
#{is_superuser := false, user_id := _}, #{is_superuser := false, user_id := _},
#{is_superuser := false, user_id := _} #{is_superuser := false, user_id := _}
], ],
meta := #{page := 1, limit := 2, count := 3} meta := #{page := 1, limit := 2, count := 3, hasnext := true}
} = emqx_authn_mnesia:list_users( } = emqx_authn_mnesia:list_users(
#{<<"page">> => 1, <<"limit">> => 2}, #{<<"page">> => 1, <<"limit">> => 2},
State State
@ -205,7 +205,7 @@ t_list_users(_) ->
#{ #{
data := [#{is_superuser := false, user_id := _}], data := [#{is_superuser := false, user_id := _}],
meta := #{page := 2, limit := 2, count := 3} meta := #{page := 2, limit := 2, count := 3, hasnext := false}
} = emqx_authn_mnesia:list_users( } = emqx_authn_mnesia:list_users(
#{<<"page">> => 2, <<"limit">> => 2}, #{<<"page">> => 2, <<"limit">> => 2},
State State
@ -213,7 +213,7 @@ t_list_users(_) ->
#{ #{
data := [#{is_superuser := false, user_id := <<"u3">>}], data := [#{is_superuser := false, user_id := <<"u3">>}],
meta := #{page := 1, limit := 20, count := 0} meta := #{page := 1, limit := 20, hasnext := false}
} = emqx_authn_mnesia:list_users( } = emqx_authn_mnesia:list_users(
#{ #{
<<"page">> => 1, <<"page">> => 1,

View File

@ -300,14 +300,14 @@ t_list_users(_) ->
#{ #{
data := [?USER_MAP, ?USER_MAP], data := [?USER_MAP, ?USER_MAP],
meta := #{page := 1, limit := 2, count := 3} meta := #{page := 1, limit := 2, count := 3, hasnext := true}
} = emqx_enhanced_authn_scram_mnesia:list_users( } = emqx_enhanced_authn_scram_mnesia:list_users(
#{<<"page">> => 1, <<"limit">> => 2}, #{<<"page">> => 1, <<"limit">> => 2},
State State
), ),
#{ #{
data := [?USER_MAP], data := [?USER_MAP],
meta := #{page := 2, limit := 2, count := 3} meta := #{page := 2, limit := 2, count := 3, hasnext := false}
} = emqx_enhanced_authn_scram_mnesia:list_users( } = emqx_enhanced_authn_scram_mnesia:list_users(
#{<<"page">> => 2, <<"limit">> => 2}, #{<<"page">> => 2, <<"limit">> => 2},
State State
@ -319,7 +319,7 @@ t_list_users(_) ->
is_superuser := _ is_superuser := _
} }
], ],
meta := #{page := 1, limit := 3, count := 0} meta := #{page := 1, limit := 3, hasnext := false}
} = emqx_enhanced_authn_scram_mnesia:list_users( } = emqx_enhanced_authn_scram_mnesia:list_users(
#{ #{
<<"page">> => 1, <<"page">> => 1,

View File

@ -15,7 +15,6 @@ authz:{
pool_size: 1 pool_size: 1
username: root username: root
password: public password: public
auto_reconnect: true
ssl: { ssl: {
enable: true enable: true
cacertfile: "etc/certs/cacert.pem" cacertfile: "etc/certs/cacert.pem"
@ -33,7 +32,6 @@ authz:{
pool_size: 1 pool_size: 1
username: root username: root
password: public password: public
auto_reconnect: true
ssl: {enable: false} ssl: {enable: false}
} }
sql: "select ipaddress, username, clientid, action, permission, topic from mqtt_authz where ipaddr = ${peerhost} or username = ${username} or username = '$all' or clientid = ${clientid}" sql: "select ipaddress, username, clientid, action, permission, topic from mqtt_authz where ipaddr = ${peerhost} or username = ${username} or username = '$all' or clientid = ${clientid}"
@ -45,7 +43,6 @@ authz:{
database: 0 database: 0
pool_size: 1 pool_size: 1
password: public password: public
auto_reconnect: true
ssl: {enable: false} ssl: {enable: false}
} }
cmd: "HGETALL mqtt_authz:${username}" cmd: "HGETALL mqtt_authz:${username}"

View File

@ -1,6 +1,7 @@
authorization { authorization {
deny_action = ignore deny_action = ignore
no_match = allow no_match = allow
cache = { enable = true }
sources = [ sources = [
{ {
type = file type = file

View File

@ -64,7 +64,7 @@ schema("/authorization/settings") ->
}. }.
ref_authz_schema() -> ref_authz_schema() ->
proplists:delete(sources, emqx_conf_schema:fields("authorization")). emqx_schema:authz_fields().
settings(get, _Params) -> settings(get, _Params) ->
{200, authorization_settings()}; {200, authorization_settings()};
@ -83,4 +83,6 @@ settings(put, #{
{200, authorization_settings()}. {200, authorization_settings()}.
authorization_settings() -> authorization_settings() ->
maps:remove(<<"sources">>, emqx:get_raw_config([authorization], #{})). C = maps:remove(<<"sources">>, emqx:get_raw_config([authorization], #{})),
Schema = emqx_hocon:make_schema(emqx_schema:authz_fields()),
hocon_tconf:make_serializable(Schema, C, #{}).

View File

@ -449,7 +449,7 @@ is_ok(ResL) ->
get_raw_sources() -> get_raw_sources() ->
RawSources = emqx:get_raw_config([authorization, sources], []), RawSources = emqx:get_raw_config([authorization, sources], []),
Schema = #{roots => emqx_authz_schema:fields("authorization"), fields => #{}}, Schema = emqx_hocon:make_schema(emqx_authz_schema:authz_fields()),
Conf = #{<<"sources">> => RawSources}, Conf = #{<<"sources">> => RawSources},
#{<<"sources">> := Sources} = hocon_tconf:make_serializable(Schema, Conf, #{}), #{<<"sources">> := Sources} = hocon_tconf:make_serializable(Schema, Conf, #{}),
merge_default_headers(Sources). merge_default_headers(Sources).

View File

@ -33,9 +33,11 @@
-export([ -export([
namespace/0, namespace/0,
roots/0, roots/0,
tags/0,
fields/1, fields/1,
validations/0, validations/0,
desc/1 desc/1,
authz_fields/0
]). ]).
-export([ -export([
@ -65,28 +67,15 @@ type_names() ->
namespace() -> authz. namespace() -> authz.
tags() ->
[<<"Authorization">>].
%% @doc authorization schema is not exported %% @doc authorization schema is not exported
%% but directly used by emqx_schema %% but directly used by emqx_schema
roots() -> []. roots() -> [].
fields("authorization") -> fields("authorization") ->
Types = [?R_REF(Type) || Type <- type_names()], authz_fields();
UnionMemberSelector =
fun
(all_union_members) -> Types;
%% must return list
({value, Value}) -> [select_union_member(Value)]
end,
[
{sources,
?HOCON(
?ARRAY(?UNION(UnionMemberSelector)),
#{
default => [],
desc => ?DESC(sources)
}
)}
];
fields(file) -> fields(file) ->
authz_common_fields(file) ++ authz_common_fields(file) ++
[{path, ?HOCON(string(), #{required => true, desc => ?DESC(path)})}]; [{path, ?HOCON(string(), #{required => true, desc => ?DESC(path)})}];
@ -488,3 +477,22 @@ select_union_member_loop(TypeValue, [Type | Types]) ->
false -> false ->
select_union_member_loop(TypeValue, Types) select_union_member_loop(TypeValue, Types)
end. end.
authz_fields() ->
Types = [?R_REF(Type) || Type <- type_names()],
UnionMemberSelector =
fun
(all_union_members) -> Types;
%% must return list
({value, Value}) -> [select_union_member(Value)]
end,
[
{sources,
?HOCON(
?ARRAY(?UNION(UnionMemberSelector)),
#{
default => [],
desc => ?DESC(sources)
}
)}
].

View File

@ -18,7 +18,7 @@
-compile(nowarn_export_all). -compile(nowarn_export_all).
-compile(export_all). -compile(export_all).
-import(emqx_dashboard_api_test_helpers, [request/2, uri/1]). -import(emqx_mgmt_api_test_util, [request/2, uri/1]).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
@ -32,8 +32,8 @@ groups() ->
[]. [].
init_per_suite(Config) -> init_per_suite(Config) ->
ok = emqx_common_test_helpers:start_apps( ok = emqx_mgmt_api_test_util:init_suite(
[emqx_conf, emqx_authz, emqx_dashboard, emqx_management], [emqx_conf, emqx_authz],
fun set_special_configs/1 fun set_special_configs/1
), ),
Config. Config.
@ -47,7 +47,7 @@ end_per_suite(_Config) ->
<<"sources">> => [] <<"sources">> => []
} }
), ),
emqx_common_test_helpers:stop_apps([emqx_dashboard, emqx_authz, emqx_conf, emqx_management]), emqx_mgmt_api_test_util:end_suite([emqx_authz, emqx_conf]),
ok. ok.
set_special_configs(emqx_dashboard) -> set_special_configs(emqx_dashboard) ->
@ -67,12 +67,12 @@ t_clean_cahce(_) ->
ok = emqtt:publish(C, <<"a/b/c">>, <<"{\"x\":1,\"y\":1}">>, 0), ok = emqtt:publish(C, <<"a/b/c">>, <<"{\"x\":1,\"y\":1}">>, 0),
{ok, 200, Result3} = request(get, uri(["clients", "emqx0", "authorization", "cache"])), {ok, 200, Result3} = request(get, uri(["clients", "emqx0", "authorization", "cache"])),
?assertEqual(2, length(jsx:decode(Result3))), ?assertEqual(2, length(emqx_json:decode(Result3))),
request(delete, uri(["authorization", "cache"])), request(delete, uri(["authorization", "cache"])),
{ok, 200, Result4} = request(get, uri(["clients", "emqx0", "authorization", "cache"])), {ok, 200, Result4} = request(get, uri(["clients", "emqx0", "authorization", "cache"])),
?assertEqual(0, length(jsx:decode(Result4))), ?assertEqual(0, length(emqx_json:decode(Result4))),
ok. ok.

View File

@ -22,7 +22,7 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-import(emqx_dashboard_api_test_helpers, [request/3, uri/1]). -import(emqx_mgmt_api_test_util, [request/3, uri/1]).
all() -> all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
@ -31,8 +31,8 @@ groups() ->
[]. [].
init_per_suite(Config) -> init_per_suite(Config) ->
ok = emqx_common_test_helpers:start_apps( ok = emqx_mgmt_api_test_util:init_suite(
[emqx_conf, emqx_authz, emqx_dashboard], [emqx_conf, emqx_authz],
fun set_special_configs/1 fun set_special_configs/1
), ),
Config. Config.
@ -46,7 +46,7 @@ end_per_suite(_Config) ->
<<"sources">> => [] <<"sources">> => []
} }
), ),
emqx_common_test_helpers:stop_apps([emqx_dashboard, emqx_authz, emqx_conf]), emqx_mgmt_api_test_util:end_suite([emqx_authz, emqx_conf]),
ok. ok.
set_special_configs(emqx_dashboard) -> set_special_configs(emqx_dashboard) ->
@ -92,7 +92,8 @@ t_api(_) ->
<<"meta">> := #{ <<"meta">> := #{
<<"count">> := 1, <<"count">> := 1,
<<"limit">> := 100, <<"limit">> := 100,
<<"page">> := 1 <<"page">> := 1,
<<"hasnext">> := false
} }
} = jsx:decode(Request1), } = jsx:decode(Request1),
?assertEqual(3, length(Rules1)), ?assertEqual(3, length(Rules1)),
@ -111,9 +112,9 @@ t_api(_) ->
#{ #{
<<"data">> := [], <<"data">> := [],
<<"meta">> := #{ <<"meta">> := #{
<<"count">> := 0,
<<"limit">> := 20, <<"limit">> := 20,
<<"page">> := 1 <<"page">> := 1,
<<"hasnext">> := false
} }
} = jsx:decode(Request1_1), } = jsx:decode(Request1_1),

View File

@ -18,7 +18,7 @@
-compile(nowarn_export_all). -compile(nowarn_export_all).
-compile(export_all). -compile(export_all).
-import(emqx_dashboard_api_test_helpers, [request/3, uri/1]). -import(emqx_mgmt_api_test_util, [request/3, uri/1]).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
@ -30,7 +30,7 @@ groups() ->
[]. [].
init_per_suite(Config) -> init_per_suite(Config) ->
ok = emqx_common_test_helpers:start_apps( ok = emqx_mgmt_api_test_util:init_suite(
[emqx_conf, emqx_authz, emqx_dashboard], [emqx_conf, emqx_authz, emqx_dashboard],
fun set_special_configs/1 fun set_special_configs/1
), ),
@ -46,7 +46,7 @@ end_per_suite(_Config) ->
} }
), ),
ok = stop_apps([emqx_resource]), ok = stop_apps([emqx_resource]),
emqx_common_test_helpers:stop_apps([emqx_dashboard, emqx_authz, emqx_conf]), emqx_mgmt_api_test_util:end_suite([emqx_authz, emqx_conf]),
ok. ok.
set_special_configs(emqx_dashboard) -> set_special_configs(emqx_dashboard) ->

View File

@ -18,7 +18,7 @@
-compile(nowarn_export_all). -compile(nowarn_export_all).
-compile(export_all). -compile(export_all).
-import(emqx_dashboard_api_test_helpers, [request/3, uri/1]). -import(emqx_mgmt_api_test_util, [request/3, uri/1]).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
@ -115,8 +115,8 @@ init_per_suite(Config) ->
end end
), ),
ok = emqx_common_test_helpers:start_apps( ok = emqx_mgmt_api_test_util:init_suite(
[emqx_conf, emqx_authz, emqx_dashboard], [emqx_conf, emqx_authz],
fun set_special_configs/1 fun set_special_configs/1
), ),
ok = start_apps([emqx_resource]), ok = start_apps([emqx_resource]),
@ -134,7 +134,7 @@ end_per_suite(_Config) ->
%% resource and connector should be stop first, %% resource and connector should be stop first,
%% or authz_[mysql|pgsql|redis..]_SUITE would be failed %% or authz_[mysql|pgsql|redis..]_SUITE would be failed
ok = stop_apps([emqx_resource]), ok = stop_apps([emqx_resource]),
emqx_common_test_helpers:stop_apps([emqx_dashboard, emqx_authz, emqx_conf]), emqx_mgmt_api_test_util:end_suite([emqx_authz, emqx_conf]),
meck:unload(emqx_resource), meck:unload(emqx_resource),
ok. ok.

View File

@ -93,9 +93,8 @@ init_per_suite(Config) ->
" }" " }"
>> >>
), ),
emqx_common_test_helpers:start_apps( emqx_mgmt_api_test_util:init_suite(
[emqx_conf, emqx_dashboard, ?APP], [emqx_conf, ?APP]
fun set_special_configs/1
), ),
Config. Config.
@ -111,12 +110,6 @@ end_per_testcase(t_get_basic_usage_info, _Config) ->
end_per_testcase(_TestCase, _Config) -> end_per_testcase(_TestCase, _Config) ->
ok. ok.
set_special_configs(emqx_dashboard) ->
emqx_dashboard_api_test_helpers:set_default_config(),
ok;
set_special_configs(_) ->
ok.
topic_config(T) -> topic_config(T) ->
#{ #{
topic => T, topic => T,
@ -132,7 +125,7 @@ end_per_suite(_) ->
application:unload(?APP), application:unload(?APP),
meck:unload(emqx_resource), meck:unload(emqx_resource),
meck:unload(emqx_schema), meck:unload(emqx_schema),
emqx_common_test_helpers:stop_apps([emqx_dashboard, emqx_conf, ?APP]). emqx_mgmt_api_test_util:end_suite([emqx_conf, ?APP]).
t_auto_subscribe(_) -> t_auto_subscribe(_) ->
emqx_auto_subscribe:update([#{<<"topic">> => Topic} || Topic <- ?TOPICS]), emqx_auto_subscribe:update([#{<<"topic">> => Topic} || Topic <- ?TOPICS]),

View File

@ -20,7 +20,7 @@
-import(hoconsc, [mk/2, ref/2]). -import(hoconsc, [mk/2, ref/2]).
-export([roots/0, fields/1, desc/1, namespace/0]). -export([roots/0, fields/1, desc/1, namespace/0, tags/0]).
-export([ -export([
get_response/0, get_response/0,
@ -104,6 +104,9 @@ metrics_status_fields() ->
namespace() -> "bridge". namespace() -> "bridge".
tags() ->
[<<"Bridge">>].
roots() -> [bridges]. roots() -> [bridges].
fields(bridges) -> fields(bridges) ->

View File

@ -18,7 +18,7 @@
-compile(nowarn_export_all). -compile(nowarn_export_all).
-compile(export_all). -compile(export_all).
-import(emqx_dashboard_api_test_helpers, [request/4, uri/1]). -import(emqx_mgmt_api_test_util, [request/3, uri/1]).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
@ -60,9 +60,8 @@ init_per_suite(Config) ->
%% some testcases (may from other app) already get emqx_connector started %% some testcases (may from other app) already get emqx_connector started
_ = application:stop(emqx_resource), _ = application:stop(emqx_resource),
_ = application:stop(emqx_connector), _ = application:stop(emqx_connector),
ok = emqx_common_test_helpers:start_apps( ok = emqx_mgmt_api_test_util:init_suite(
[emqx_rule_engine, emqx_bridge, emqx_dashboard], [emqx_rule_engine, emqx_bridge]
fun set_special_configs/1
), ),
ok = emqx_common_test_helpers:load_config( ok = emqx_common_test_helpers:load_config(
emqx_rule_engine_schema, emqx_rule_engine_schema,
@ -72,12 +71,7 @@ init_per_suite(Config) ->
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([emqx_rule_engine, emqx_bridge, emqx_dashboard]), emqx_mgmt_api_test_util:end_suite([emqx_rule_engine, emqx_bridge]),
ok.
set_special_configs(emqx_dashboard) ->
emqx_dashboard_api_test_helpers:set_default_config(<<"bridge_admin">>);
set_special_configs(_) ->
ok. ok.
init_per_testcase(_, Config) -> init_per_testcase(_, Config) ->
@ -605,9 +599,6 @@ t_with_redact_update(_Config) ->
?assertEqual(Password, Value), ?assertEqual(Password, Value),
ok. ok.
request(Method, Url, Body) ->
request(<<"bridge_admin">>, Method, Url, Body).
operation_path(node, Oper, BridgeID) -> operation_path(node, Oper, BridgeID) ->
uri(["nodes", node(), "bridges", BridgeID, "operation", Oper]); uri(["nodes", node(), "bridges", BridgeID, "operation", Oper]);
operation_path(cluster, Oper, BridgeID) -> operation_path(cluster, Oper, BridgeID) ->

View File

@ -38,7 +38,9 @@
cipher/0 cipher/0
]). ]).
-export([namespace/0, roots/0, fields/1, translations/0, translation/1, validations/0, desc/1]). -export([
namespace/0, roots/0, fields/1, translations/0, translation/1, validations/0, desc/1, tags/0
]).
-export([conf_get/2, conf_get/3, keys/2, filter/1]). -export([conf_get/2, conf_get/3, keys/2, filter/1]).
%% Static apps which merge their configs into the merged emqx.conf %% Static apps which merge their configs into the merged emqx.conf
@ -67,6 +69,9 @@
%% root config should not have a namespace %% root config should not have a namespace
namespace() -> undefined. namespace() -> undefined.
tags() ->
[<<"EMQX">>].
roots() -> roots() ->
PtKey = ?EMQX_AUTHENTICATION_SCHEMA_MODULE_PT_KEY, PtKey = ?EMQX_AUTHENTICATION_SCHEMA_MODULE_PT_KEY,
case persistent_term:get(PtKey, undefined) of case persistent_term:get(PtKey, undefined) of
@ -942,8 +947,8 @@ fields("log_burst_limit") ->
)} )}
]; ];
fields("authorization") -> fields("authorization") ->
emqx_schema:fields("authorization") ++ emqx_schema:authz_fields() ++
emqx_authz_schema:fields("authorization"). emqx_authz_schema:authz_fields().
desc("cluster") -> desc("cluster") ->
?DESC("desc_cluster"); ?DESC("desc_cluster");

View File

@ -14,7 +14,7 @@ An MySQL connector can be used as following:
``` ```
(emqx@127.0.0.1)5> emqx_resource:list_instances_verbose(). (emqx@127.0.0.1)5> emqx_resource:list_instances_verbose().
[#{config => [#{config =>
#{auto_reconnect => true,cacertfile => [],certfile => [], #{cacertfile => [],certfile => [],
database => "mqtt",keyfile => [],password => "public", database => "mqtt",keyfile => [],password => "public",
pool_size => 1, pool_size => 1,
server => {{127,0,0,1},3306}, server => {{127,0,0,1},3306},

View File

@ -68,13 +68,13 @@ emqx_connector_schema_lib {
auto_reconnect { auto_reconnect {
desc { desc {
en: "Enable automatic reconnect to the database." en: "Deprecated. Enable automatic reconnect to the database."
zh: "自动重连数据库。" zh: "已弃用。自动重连数据库。"
} }
label: { label: {
en: "Auto Reconnect Database" en: "Deprecated. Auto Reconnect Database"
zh: "自动重连数据库" zh: "已弃用。自动重连数据库"
} }
} }
} }

View File

@ -24,6 +24,8 @@
-define(REDIS_DEFAULT_PORT, 6379). -define(REDIS_DEFAULT_PORT, 6379).
-define(PGSQL_DEFAULT_PORT, 5432). -define(PGSQL_DEFAULT_PORT, 5432).
-define(AUTO_RECONNECT_INTERVAL, 2).
-define(SERVERS_DESC, -define(SERVERS_DESC,
"A Node list for Cluster to connect to. The nodes should be separated with commas, such as: `Node[,Node].`<br/>" "A Node list for Cluster to connect to. The nodes should be separated with commas, such as: `Node[,Node].`<br/>"
"For each Node should be: " "For each Node should be: "

View File

@ -12,7 +12,7 @@
{mysql, {git, "https://github.com/emqx/mysql-otp", {tag, "1.7.1"}}}, {mysql, {git, "https://github.com/emqx/mysql-otp", {tag, "1.7.1"}}},
{epgsql, {git, "https://github.com/emqx/epgsql", {tag, "4.7-emqx.2"}}}, {epgsql, {git, "https://github.com/emqx/epgsql", {tag, "4.7-emqx.2"}}},
%% NOTE: mind poolboy version when updating mongodb-erlang version %% NOTE: mind poolboy version when updating mongodb-erlang version
{mongodb, {git, "https://github.com/emqx/mongodb-erlang", {tag, "v3.0.18"}}}, {mongodb, {git, "https://github.com/emqx/mongodb-erlang", {tag, "v3.0.19"}}},
%% NOTE: mind poolboy version when updating eredis_cluster version %% NOTE: mind poolboy version when updating eredis_cluster version
{eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.7.5"}}}, {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.7.5"}}},
%% mongodb-erlang uses a special fork https://github.com/comtihon/poolboy.git %% mongodb-erlang uses a special fork https://github.com/comtihon/poolboy.git

View File

@ -209,7 +209,7 @@ on_start(
?SLOG(info, #{ ?SLOG(info, #{
msg => "starting_http_connector", msg => "starting_http_connector",
connector => InstId, connector => InstId,
config => Config config => emqx_misc:redact(Config)
}), }),
{Transport, TransportOpts} = {Transport, TransportOpts} =
case Scheme of case Scheme of

View File

@ -59,14 +59,13 @@ on_start(
bind_password := BindPassword, bind_password := BindPassword,
timeout := Timeout, timeout := Timeout,
pool_size := PoolSize, pool_size := PoolSize,
auto_reconnect := AutoReconn,
ssl := SSL ssl := SSL
} = Config } = Config
) -> ) ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "starting_ldap_connector", msg => "starting_ldap_connector",
connector => InstId, connector => InstId,
config => Config config => emqx_misc:redact(Config)
}), }),
Servers = emqx_schema:parse_servers(Servers0, ?LDAP_HOST_OPTIONS), Servers = emqx_schema:parse_servers(Servers0, ?LDAP_HOST_OPTIONS),
SslOpts = SslOpts =
@ -86,11 +85,11 @@ on_start(
{bind_password, BindPassword}, {bind_password, BindPassword},
{timeout, Timeout}, {timeout, Timeout},
{pool_size, PoolSize}, {pool_size, PoolSize},
{auto_reconnect, reconn_interval(AutoReconn)} {auto_reconnect, ?AUTO_RECONNECT_INTERVAL}
], ],
PoolName = emqx_plugin_libs_pool:pool_name(InstId), PoolName = emqx_plugin_libs_pool:pool_name(InstId),
case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ SslOpts) of case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ SslOpts) of
ok -> {ok, #{poolname => PoolName, auto_reconnect => AutoReconn}}; ok -> {ok, #{poolname => PoolName}};
{error, Reason} -> {error, Reason} {error, Reason} -> {error, Reason}
end. end.
@ -129,9 +128,6 @@ on_query(InstId, {search, Base, Filter, Attributes}, #{poolname := PoolName} = S
on_get_status(_InstId, _State) -> connected. on_get_status(_InstId, _State) -> connected.
reconn_interval(true) -> 15;
reconn_interval(false) -> false.
search(Conn, Base, Filter, Attributes) -> search(Conn, Base, Filter, Attributes) ->
eldap2:search(Conn, [ eldap2:search(Conn, [
{base, Base}, {base, Base},

View File

@ -155,7 +155,7 @@ on_start(
rs -> "starting_mongodb_replica_set_connector"; rs -> "starting_mongodb_replica_set_connector";
sharded -> "starting_mongodb_sharded_connector" sharded -> "starting_mongodb_sharded_connector"
end, end,
?SLOG(info, #{msg => Msg, connector => InstId, config => Config}), ?SLOG(info, #{msg => Msg, connector => InstId, config => emqx_misc:redact(Config)}),
NConfig = #{hosts := Hosts} = maybe_resolve_srv_and_txt_records(Config), NConfig = #{hosts := Hosts} = maybe_resolve_srv_and_txt_records(Config),
SslOpts = SslOpts =
case maps:get(enable, SSL) of case maps:get(enable, SSL) of

View File

@ -15,6 +15,8 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_connector_mqtt). -module(emqx_connector_mqtt).
-include("emqx_connector.hrl").
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl"). -include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
@ -147,7 +149,7 @@ on_start(InstId, Conf) ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "starting_mqtt_connector", msg => "starting_mqtt_connector",
connector => InstanceId, connector => InstanceId,
config => Conf config => emqx_misc:redact(Conf)
}), }),
BasicConf = basic_config(Conf), BasicConf = basic_config(Conf),
BridgeConf = BasicConf#{ BridgeConf = BasicConf#{
@ -198,12 +200,10 @@ on_query_async(
?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}), ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}),
emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, {ReplyFun, Args}). emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, {ReplyFun, Args}).
on_get_status(_InstId, #{name := InstanceId, bridge_conf := Conf}) -> on_get_status(_InstId, #{name := InstanceId}) ->
AutoReconn = maps:get(auto_reconnect, Conf, true),
case emqx_connector_mqtt_worker:status(InstanceId) of case emqx_connector_mqtt_worker:status(InstanceId) of
connected -> connected; connected -> connected;
_ when AutoReconn == true -> connecting; _ -> connecting
_ when AutoReconn == false -> disconnected
end. end.
ensure_mqtt_worker_started(InstanceId, BridgeConf) -> ensure_mqtt_worker_started(InstanceId, BridgeConf) ->
@ -236,7 +236,6 @@ make_forward_confs(FrowardConf) ->
basic_config( basic_config(
#{ #{
server := Server, server := Server,
reconnect_interval := ReconnIntv,
proto_ver := ProtoVer, proto_ver := ProtoVer,
bridge_mode := BridgeMode, bridge_mode := BridgeMode,
clean_start := CleanStart, clean_start := CleanStart,
@ -252,7 +251,7 @@ basic_config(
%% 30s %% 30s
connect_timeout => 30, connect_timeout => 30,
auto_reconnect => true, auto_reconnect => true,
reconnect_interval => ReconnIntv, reconnect_interval => ?AUTO_RECONNECT_INTERVAL,
proto_ver => ProtoVer, proto_ver => ProtoVer,
%% Opening bridge_mode will form a non-standard mqtt connection message. %% Opening bridge_mode will form a non-standard mqtt connection message.
%% A load balancing server (such as haproxy) is often set up before the emqx broker server. %% A load balancing server (such as haproxy) is often set up before the emqx broker server.

View File

@ -52,7 +52,6 @@
-type state() :: -type state() ::
#{ #{
poolname := atom(), poolname := atom(),
auto_reconnect := boolean(),
prepare_statement := prepares(), prepare_statement := prepares(),
params_tokens := params_tokens(), params_tokens := params_tokens(),
batch_inserts := sqls(), batch_inserts := sqls(),
@ -84,8 +83,6 @@ on_start(
server := Server, server := Server,
database := DB, database := DB,
username := User, username := User,
password := Password,
auto_reconnect := AutoReconn,
pool_size := PoolSize, pool_size := PoolSize,
ssl := SSL ssl := SSL
} = Config } = Config
@ -94,7 +91,7 @@ on_start(
?SLOG(info, #{ ?SLOG(info, #{
msg => "starting_mysql_connector", msg => "starting_mysql_connector",
connector => InstId, connector => InstId,
config => Config config => emqx_misc:redact(Config)
}), }),
SslOpts = SslOpts =
case maps:get(enable, SSL) of case maps:get(enable, SSL) of
@ -107,14 +104,14 @@ on_start(
{host, Host}, {host, Host},
{port, Port}, {port, Port},
{user, User}, {user, User},
{password, Password}, {password, maps:get(password, Config, <<>>)},
{database, DB}, {database, DB},
{auto_reconnect, reconn_interval(AutoReconn)}, {auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
{pool_size, PoolSize} {pool_size, PoolSize}
], ],
PoolName = emqx_plugin_libs_pool:pool_name(InstId), PoolName = emqx_plugin_libs_pool:pool_name(InstId),
Prepares = parse_prepare_sql(Config), Prepares = parse_prepare_sql(Config),
State = maps:merge(#{poolname => PoolName, auto_reconnect => AutoReconn}, Prepares), State = maps:merge(#{poolname => PoolName}, Prepares),
case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of
ok -> ok ->
{ok, init_prepare(State)}; {ok, init_prepare(State)};
@ -194,7 +191,7 @@ mysql_function(prepared_query) ->
mysql_function(_) -> mysql_function(_) ->
mysql_function(prepared_query). mysql_function(prepared_query).
on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn} = State) -> on_get_status(_InstId, #{poolname := Pool} = State) ->
case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of
true -> true ->
case do_check_prepares(State) of case do_check_prepares(State) of
@ -205,10 +202,10 @@ on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn} = State
{connected, NState}; {connected, NState};
{error, _Reason} -> {error, _Reason} ->
%% do not log error, it is logged in prepare_sql_to_conn %% do not log error, it is logged in prepare_sql_to_conn
conn_status(AutoReconn) connecting
end; end;
false -> false ->
conn_status(AutoReconn) connecting
end. end.
do_get_status(Conn) -> do_get_status(Conn) ->
@ -227,11 +224,6 @@ do_check_prepares(State = #{poolname := PoolName, prepare_statement := {error, P
end. end.
%% =================================================================== %% ===================================================================
conn_status(_AutoReconn = true) -> connecting;
conn_status(_AutoReconn = false) -> disconnected.
reconn_interval(true) -> 15;
reconn_interval(false) -> false.
connect(Options) -> connect(Options) ->
mysql:start_link(Options). mysql:start_link(Options).

View File

@ -56,7 +56,6 @@
-type state() :: -type state() ::
#{ #{
poolname := atom(), poolname := atom(),
auto_reconnect := boolean(),
prepare_sql := prepares(), prepare_sql := prepares(),
params_tokens := params_tokens(), params_tokens := params_tokens(),
prepare_statement := epgsql:statement() prepare_statement := epgsql:statement()
@ -87,8 +86,6 @@ on_start(
server := Server, server := Server,
database := DB, database := DB,
username := User, username := User,
password := Password,
auto_reconnect := AutoReconn,
pool_size := PoolSize, pool_size := PoolSize,
ssl := SSL ssl := SSL
} = Config } = Config
@ -97,7 +94,7 @@ on_start(
?SLOG(info, #{ ?SLOG(info, #{
msg => "starting_postgresql_connector", msg => "starting_postgresql_connector",
connector => InstId, connector => InstId,
config => Config config => emqx_misc:redact(Config)
}), }),
SslOpts = SslOpts =
case maps:get(enable, SSL) of case maps:get(enable, SSL) of
@ -113,14 +110,14 @@ on_start(
{host, Host}, {host, Host},
{port, Port}, {port, Port},
{username, User}, {username, User},
{password, emqx_secret:wrap(Password)}, {password, emqx_secret:wrap(maps:get(password, Config, ""))},
{database, DB}, {database, DB},
{auto_reconnect, reconn_interval(AutoReconn)}, {auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
{pool_size, PoolSize} {pool_size, PoolSize}
], ],
PoolName = emqx_plugin_libs_pool:pool_name(InstId), PoolName = emqx_plugin_libs_pool:pool_name(InstId),
Prepares = parse_prepare_sql(Config), Prepares = parse_prepare_sql(Config),
InitState = #{poolname => PoolName, auto_reconnect => AutoReconn, prepare_statement => #{}}, InitState = #{poolname => PoolName, prepare_statement => #{}},
State = maps:merge(InitState, Prepares), State = maps:merge(InitState, Prepares),
case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of
ok -> ok ->
@ -247,7 +244,7 @@ on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) ->
end, end,
Result. Result.
on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn} = State) -> on_get_status(_InstId, #{poolname := Pool} = State) ->
case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of
true -> true ->
case do_check_prepares(State) of case do_check_prepares(State) of
@ -258,10 +255,10 @@ on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn} = State
{connected, NState}; {connected, NState};
false -> false ->
%% do not log error, it is logged in prepare_sql_to_conn %% do not log error, it is logged in prepare_sql_to_conn
conn_status(AutoReconn) connecting
end; end;
false -> false ->
conn_status(AutoReconn) connecting
end. end.
do_get_status(Conn) -> do_get_status(Conn) ->
@ -280,11 +277,6 @@ do_check_prepares(State = #{poolname := PoolName, prepare_sql := {error, Prepare
end. end.
%% =================================================================== %% ===================================================================
conn_status(_AutoReconn = true) -> connecting;
conn_status(_AutoReconn = false) -> disconnected.
reconn_interval(true) -> 15;
reconn_interval(false) -> false.
connect(Opts) -> connect(Opts) ->
Host = proplists:get_value(host, Opts), Host = proplists:get_value(host, Opts),

View File

@ -117,14 +117,13 @@ on_start(
#{ #{
redis_type := Type, redis_type := Type,
pool_size := PoolSize, pool_size := PoolSize,
auto_reconnect := AutoReconn,
ssl := SSL ssl := SSL
} = Config } = Config
) -> ) ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "starting_redis_connector", msg => "starting_redis_connector",
connector => InstId, connector => InstId,
config => Config config => emqx_misc:redact(Config)
}), }),
ConfKey = ConfKey =
case Type of case Type of
@ -142,7 +141,7 @@ on_start(
[ [
{pool_size, PoolSize}, {pool_size, PoolSize},
{password, maps:get(password, Config, "")}, {password, maps:get(password, Config, "")},
{auto_reconnect, reconn_interval(AutoReconn)} {auto_reconnect, ?AUTO_RECONNECT_INTERVAL}
] ++ Database ++ Servers, ] ++ Database ++ Servers,
Options = Options =
case maps:get(enable, SSL) of case maps:get(enable, SSL) of
@ -155,7 +154,7 @@ on_start(
[{ssl, false}] [{ssl, false}]
end ++ [{sentinel, maps:get(sentinel, Config, undefined)}], end ++ [{sentinel, maps:get(sentinel, Config, undefined)}],
PoolName = emqx_plugin_libs_pool:pool_name(InstId), PoolName = emqx_plugin_libs_pool:pool_name(InstId),
State = #{poolname => PoolName, type => Type, auto_reconnect => AutoReconn}, State = #{poolname => PoolName, type => Type},
case Type of case Type of
cluster -> cluster ->
case eredis_cluster:start_pool(PoolName, Opts ++ [{options, Options}]) of case eredis_cluster:start_pool(PoolName, Opts ++ [{options, Options}]) of
@ -229,18 +228,18 @@ eredis_cluster_workers_exist_and_are_connected(Workers) ->
Workers Workers
). ).
on_get_status(_InstId, #{type := cluster, poolname := PoolName, auto_reconnect := AutoReconn}) -> on_get_status(_InstId, #{type := cluster, poolname := PoolName}) ->
case eredis_cluster:pool_exists(PoolName) of case eredis_cluster:pool_exists(PoolName) of
true -> true ->
Workers = extract_eredis_cluster_workers(PoolName), Workers = extract_eredis_cluster_workers(PoolName),
Health = eredis_cluster_workers_exist_and_are_connected(Workers), Health = eredis_cluster_workers_exist_and_are_connected(Workers),
status_result(Health, AutoReconn); status_result(Health);
false -> false ->
disconnected disconnected
end; end;
on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn}) -> on_get_status(_InstId, #{poolname := Pool}) ->
Health = emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1), Health = emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1),
status_result(Health, AutoReconn). status_result(Health).
do_get_status(Conn) -> do_get_status(Conn) ->
case eredis:q(Conn, ["PING"]) of case eredis:q(Conn, ["PING"]) of
@ -248,12 +247,8 @@ do_get_status(Conn) ->
_ -> false _ -> false
end. end.
status_result(_Status = true, _AutoReconn) -> connected; status_result(_Status = true) -> connected;
status_result(_Status = false, _AutoReconn = true) -> connecting; status_result(_Status = false) -> connecting.
status_result(_Status = false, _AutoReconn = false) -> disconnected.
reconn_interval(true) -> 15;
reconn_interval(false) -> false.
do_cmd(PoolName, cluster, {cmd, Command}) -> do_cmd(PoolName, cluster, {cmd, Command}) ->
eredis_cluster:q(PoolName, Command); eredis_cluster:q(PoolName, Command);

View File

@ -106,4 +106,5 @@ password(_) -> undefined.
auto_reconnect(type) -> boolean(); auto_reconnect(type) -> boolean();
auto_reconnect(desc) -> ?DESC("auto_reconnect"); auto_reconnect(desc) -> ?DESC("auto_reconnect");
auto_reconnect(default) -> true; auto_reconnect(default) -> true;
auto_reconnect(deprecated) -> {since, "v5.0.15"};
auto_reconnect(_) -> undefined. auto_reconnect(_) -> undefined.

View File

@ -65,8 +65,12 @@ start_listeners(Listeners) ->
components => #{ components => #{
schemas => #{}, schemas => #{},
'securitySchemes' => #{ 'securitySchemes' => #{
'basicAuth' => #{type => http, scheme => basic}, 'basicAuth' => #{
'bearerAuth' => #{type => http, scheme => bearer} type => http,
scheme => basic,
description =>
<<"Authorize with [API Keys](https://www.emqx.io/docs/en/v5.0/admin/api.html#api-keys)">>
}
} }
} }
}, },
@ -215,28 +219,7 @@ listener_name(Protocol) ->
authorize(Req) -> authorize(Req) ->
case cowboy_req:parse_header(<<"authorization">>, Req) of case cowboy_req:parse_header(<<"authorization">>, Req) of
{basic, Username, Password} -> {basic, Username, Password} ->
case emqx_dashboard_admin:check(Username, Password) of api_key_authorize(Req, Username, Password);
ok ->
ok;
{error, <<"username_not_found">>} ->
Path = cowboy_req:path(Req),
case emqx_mgmt_auth:authorize(Path, Username, Password) of
ok ->
ok;
{error, <<"not_allowed">>} ->
return_unauthorized(
?WRONG_USERNAME_OR_PWD,
<<"Check username/password">>
);
{error, _} ->
return_unauthorized(
?WRONG_USERNAME_OR_PWD_OR_API_KEY_OR_API_SECRET,
<<"Check username/password or api_key/api_secret">>
)
end;
{error, _} ->
return_unauthorized(?WRONG_USERNAME_OR_PWD, <<"Check username/password">>)
end;
{bearer, Token} -> {bearer, Token} ->
case emqx_dashboard_admin:verify_token(Token) of case emqx_dashboard_admin:verify_token(Token) of
ok -> ok ->
@ -269,3 +252,20 @@ i18n_file() ->
listeners() -> listeners() ->
emqx_conf:get([dashboard, listeners], []). emqx_conf:get([dashboard, listeners], []).
api_key_authorize(Req, Key, Secret) ->
Path = cowboy_req:path(Req),
case emqx_mgmt_auth:authorize(Path, Key, Secret) of
ok ->
ok;
{error, <<"not_allowed">>} ->
return_unauthorized(
?BAD_API_KEY_OR_SECRET,
<<"Not allowed, Check api_key/api_secret">>
);
{error, _} ->
return_unauthorized(
?BAD_API_KEY_OR_SECRET,
<<"Check api_key/api_secret">>
)
end.

View File

@ -47,7 +47,7 @@
-define(EMPTY(V), (V == undefined orelse V == <<>>)). -define(EMPTY(V), (V == undefined orelse V == <<>>)).
-define(WRONG_USERNAME_OR_PWD, 'WRONG_USERNAME_OR_PWD'). -define(BAD_USERNAME_OR_PWD, 'BAD_USERNAME_OR_PWD').
-define(WRONG_TOKEN_OR_USERNAME, 'WRONG_TOKEN_OR_USERNAME'). -define(WRONG_TOKEN_OR_USERNAME, 'WRONG_TOKEN_OR_USERNAME').
-define(USER_NOT_FOUND, 'USER_NOT_FOUND'). -define(USER_NOT_FOUND, 'USER_NOT_FOUND').
-define(ERROR_PWD_NOT_MATCH, 'ERROR_PWD_NOT_MATCH'). -define(ERROR_PWD_NOT_MATCH, 'ERROR_PWD_NOT_MATCH').
@ -164,7 +164,7 @@ schema("/users/:username/change_pwd") ->
}. }.
response_schema(401) -> response_schema(401) ->
emqx_dashboard_swagger:error_codes([?WRONG_USERNAME_OR_PWD], ?DESC(login_failed401)); emqx_dashboard_swagger:error_codes([?BAD_USERNAME_OR_PWD], ?DESC(login_failed401));
response_schema(404) -> response_schema(404) ->
emqx_dashboard_swagger:error_codes([?USER_NOT_FOUND], ?DESC(users_api404)). emqx_dashboard_swagger:error_codes([?USER_NOT_FOUND], ?DESC(users_api404)).
@ -223,7 +223,7 @@ login(post, #{body := Params}) ->
}}; }};
{error, R} -> {error, R} ->
?SLOG(info, #{msg => "Dashboard login failed", username => Username, reason => R}), ?SLOG(info, #{msg => "Dashboard login failed", username => Username, reason => R}),
{401, ?WRONG_USERNAME_OR_PWD, <<"Auth failed">>} {401, ?BAD_USERNAME_OR_PWD, <<"Auth failed">>}
end. end.
logout(_, #{ logout(_, #{

View File

@ -139,14 +139,20 @@ fields(limit) ->
[{limit, hoconsc:mk(range(1, ?MAX_ROW_LIMIT), Meta)}]; [{limit, hoconsc:mk(range(1, ?MAX_ROW_LIMIT), Meta)}];
fields(count) -> fields(count) ->
Desc = << Desc = <<
"Total number of records counted.<br/>" "Total number of records matching the query.<br/>"
"Note: this field is <code>0</code> when the queryed table is empty, " "Note: this field is present only if the query can be optimized and does "
"or if the query can not be optimized and requires a full table scan." "not require a full table scan."
>>,
Meta = #{desc => Desc, required => false},
[{count, hoconsc:mk(non_neg_integer(), Meta)}];
fields(hasnext) ->
Desc = <<
"Flag indicating whether there are more results available on next pages."
>>, >>,
Meta = #{desc => Desc, required => true}, Meta = #{desc => Desc, required => true},
[{count, hoconsc:mk(non_neg_integer(), Meta)}]; [{hasnext, hoconsc:mk(boolean(), Meta)}];
fields(meta) -> fields(meta) ->
fields(page) ++ fields(limit) ++ fields(count). fields(page) ++ fields(limit) ++ fields(count) ++ fields(hasnext).
-spec schema_with_example(hocon_schema:type(), term()) -> hocon_schema:field_schema_map(). -spec schema_with_example(hocon_schema:type(), term()) -> hocon_schema:field_schema_map().
schema_with_example(Type, Example) -> schema_with_example(Type, Example) ->
@ -708,6 +714,8 @@ typename_to_spec("qos()", _Mod) ->
#{type => integer, minimum => 0, maximum => 2, example => 0}; #{type => integer, minimum => 0, maximum => 2, example => 0};
typename_to_spec("{binary(), binary()}", _Mod) -> typename_to_spec("{binary(), binary()}", _Mod) ->
#{type => object, example => #{}}; #{type => object, example => #{}};
typename_to_spec("{string(), string()}", _Mod) ->
#{type => object, example => #{}};
typename_to_spec("comma_separated_list()", _Mod) -> typename_to_spec("comma_separated_list()", _Mod) ->
#{type => string, example => <<"item1,item2">>}; #{type => string, example => <<"item1,item2">>};
typename_to_spec("comma_separated_binary()", _Mod) -> typename_to_spec("comma_separated_binary()", _Mod) ->

View File

@ -114,9 +114,9 @@ t_admin_delete_self_failed(_) ->
?assertEqual(1, length(Admins)), ?assertEqual(1, length(Admins)),
Header = auth_header_(<<"username1">>, <<"password">>), Header = auth_header_(<<"username1">>, <<"password">>),
{error, {_, 400, _}} = request_dashboard(delete, api_path(["users", "username1"]), Header), {error, {_, 400, _}} = request_dashboard(delete, api_path(["users", "username1"]), Header),
Token = erlang:iolist_to_binary(["Basic ", base64:encode("username1:password")]), Token = ["Basic ", base64:encode("username1:password")],
Header2 = {"Authorization", Token}, Header2 = {"Authorization", Token},
{error, {_, 400, _}} = request_dashboard(delete, api_path(["users", "username1"]), Header2), {error, {_, 401, _}} = request_dashboard(delete, api_path(["users", "username1"]), Header2),
mnesia:clear_table(?ADMIN). mnesia:clear_table(?ADMIN).
t_rest_api(_Config) -> t_rest_api(_Config) ->

View File

@ -25,43 +25,24 @@
-define(SERVER, "http://127.0.0.1:18083/api/v5"). -define(SERVER, "http://127.0.0.1:18083/api/v5").
-import(emqx_mgmt_api_test_util, [request/2]).
all() -> all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
mria:start(), mria:start(),
application:load(emqx_dashboard), emqx_mgmt_api_test_util:init_suite([emqx_conf]),
emqx_common_test_helpers:start_apps([emqx_conf, emqx_dashboard], fun set_special_configs/1),
Config. Config.
set_special_configs(emqx_dashboard) ->
emqx_dashboard_api_test_helpers:set_default_config(),
ok;
set_special_configs(_) ->
ok.
end_per_suite(Config) -> end_per_suite(Config) ->
end_suite(), end_suite(),
Config. Config.
end_suite() -> end_suite() ->
application:unload(emqx_management), emqx_mgmt_api_test_util:end_suite([emqx_conf]).
emqx_common_test_helpers:stop_apps([emqx_dashboard]).
t_bad_api_path(_) -> t_bad_api_path(_) ->
Url = ?SERVER ++ "/for/test/some/path/not/exist", Url = ?SERVER ++ "/for/test/some/path/not/exist",
{error, {"HTTP/1.1", 404, "Not Found"}} = request(Url), {ok, 404, _} = request(get, Url),
ok. ok.
request(Url) ->
Request = {Url, []},
case httpc:request(get, Request, [], []) of
{error, Reason} ->
{error, Reason};
{ok, {{"HTTP/1.1", Code, _}, _, Return}} when
Code >= 200 andalso Code =< 299
->
{ok, emqx_json:decode(Return, [return_maps])};
{ok, {Reason, _, _}} ->
{error, Reason}
end.

View File

@ -19,6 +19,8 @@
-compile(nowarn_export_all). -compile(nowarn_export_all).
-compile(export_all). -compile(export_all).
-import(emqx_dashboard_SUITE, [auth_header_/0]).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
@ -153,10 +155,6 @@ do_request_api(Method, Request) ->
{error, Reason} {error, Reason}
end. end.
auth_header_() ->
Basic = binary_to_list(base64:encode(<<"admin:public">>)),
{"Authorization", "Basic " ++ Basic}.
restart_monitor() -> restart_monitor() ->
OldMonitor = erlang:whereis(emqx_dashboard_monitor), OldMonitor = erlang:whereis(emqx_dashboard_monitor),
erlang:exit(OldMonitor, kill), erlang:exit(OldMonitor, kill),

View File

@ -347,13 +347,7 @@ do_request_api(Method, Request) ->
end. end.
auth_header_() -> auth_header_() ->
AppId = <<"admin">>, emqx_mgmt_api_test_util:auth_header_().
AppSecret = <<"public">>,
auth_header_(binary_to_list(AppId), binary_to_list(AppSecret)).
auth_header_(User, Pass) ->
Encoded = base64:encode_to_string(lists:append([User, ":", Pass])),
{"Authorization", "Basic " ++ Encoded}.
api_path(Parts) -> api_path(Parts) ->
?HOST ++ filename:join([?BASE_PATH, ?API_VERSION] ++ Parts). ?HOST ++ filename:join([?BASE_PATH, ?API_VERSION] ++ Parts).

View File

@ -49,12 +49,15 @@
]). ]).
-elvis([{elvis_style, dont_repeat_yourself, disable}]). -elvis([{elvis_style, dont_repeat_yourself, disable}]).
-export([namespace/0, roots/0, fields/1, desc/1]). -export([namespace/0, roots/0, fields/1, desc/1, tags/0]).
-export([proxy_protocol_opts/0]). -export([proxy_protocol_opts/0]).
namespace() -> gateway. namespace() -> gateway.
tags() ->
[<<"Gateway">>].
roots() -> [gateway]. roots() -> [gateway].
fields(gateway) -> fields(gateway) ->

View File

@ -106,8 +106,6 @@ assert_fields_exist(Ks, Map) ->
%% http %% http
-define(http_api_host, "http://127.0.0.1:18083/api/v5"). -define(http_api_host, "http://127.0.0.1:18083/api/v5").
-define(default_user, "admin").
-define(default_pass, "public").
request(delete = Mth, Path) -> request(delete = Mth, Path) ->
do_request(Mth, req(Path, [])); do_request(Mth, req(Path, []));
@ -176,5 +174,4 @@ url(Path, Qs) ->
lists:concat([?http_api_host, Path, "?", binary_to_list(cow_qs:qs(Qs))]). lists:concat([?http_api_host, Path, "?", binary_to_list(cow_qs:qs(Qs))]).
auth(Headers) -> auth(Headers) ->
Token = base64:encode(?default_user ++ ":" ++ ?default_pass), [emqx_mgmt_api_test_util:auth_header_() | Headers].
[{"Authorization", "Basic " ++ binary_to_list(Token)}] ++ Headers.

View File

@ -20,7 +20,6 @@
-elvis([{elvis_style, dont_repeat_yourself, #{min_complexity => 100}}]). -elvis([{elvis_style, dont_repeat_yourself, #{min_complexity => 100}}]).
-define(FRESH_SELECT, fresh_select).
-define(LONG_QUERY_TIMEOUT, 50000). -define(LONG_QUERY_TIMEOUT, 50000).
-export([ -export([
@ -174,13 +173,12 @@ do_node_query(
case do_query(Node, QueryState) of case do_query(Node, QueryState) of
{error, {badrpc, R}} -> {error, {badrpc, R}} ->
{error, Node, {badrpc, R}}; {error, Node, {badrpc, R}};
{Rows, NQueryState = #{continuation := ?FRESH_SELECT}} -> {Rows, NQueryState = #{complete := Complete}} ->
{_, NResultAcc} = accumulate_query_rows(Node, Rows, NQueryState, ResultAcc),
NResultAcc;
{Rows, NQueryState} ->
case accumulate_query_rows(Node, Rows, NQueryState, ResultAcc) of case accumulate_query_rows(Node, Rows, NQueryState, ResultAcc) of
{enough, NResultAcc} -> {enough, NResultAcc} ->
NResultAcc; finalize_query(NResultAcc, NQueryState);
{_, NResultAcc} when Complete ->
finalize_query(NResultAcc, NQueryState);
{more, NResultAcc} -> {more, NResultAcc} ->
do_node_query(Node, NQueryState, NResultAcc) do_node_query(Node, NQueryState, NResultAcc)
end end
@ -212,8 +210,8 @@ cluster_query(Tab, QString, QSchema, MsFun, FmtFun) ->
end. end.
%% @private %% @private
do_cluster_query([], _QueryState, ResultAcc) -> do_cluster_query([], QueryState, ResultAcc) ->
ResultAcc; finalize_query(ResultAcc, mark_complete(QueryState));
do_cluster_query( do_cluster_query(
[Node | Tail] = Nodes, [Node | Tail] = Nodes,
QueryState, QueryState,
@ -222,31 +220,29 @@ do_cluster_query(
case do_query(Node, QueryState) of case do_query(Node, QueryState) of
{error, {badrpc, R}} -> {error, {badrpc, R}} ->
{error, Node, {badrpc, R}}; {error, Node, {badrpc, R}};
{Rows, NQueryState} -> {Rows, NQueryState = #{complete := Complete}} ->
case accumulate_query_rows(Node, Rows, NQueryState, ResultAcc) of case accumulate_query_rows(Node, Rows, NQueryState, ResultAcc) of
{enough, NResultAcc} -> {enough, NResultAcc} ->
maybe_collect_total_from_tail_nodes(Tail, NQueryState, NResultAcc); FQueryState = maybe_collect_total_from_tail_nodes(Tail, NQueryState),
FComplete = Complete andalso Tail =:= [],
finalize_query(NResultAcc, mark_complete(FQueryState, FComplete));
{more, NResultAcc} when not Complete ->
do_cluster_query(Nodes, NQueryState, NResultAcc);
{more, NResultAcc} when Tail =/= [] ->
do_cluster_query(Tail, reset_query_state(NQueryState), NResultAcc);
{more, NResultAcc} -> {more, NResultAcc} ->
NextNodes = finalize_query(NResultAcc, NQueryState)
case NQueryState of
#{continuation := ?FRESH_SELECT} -> Tail;
_ -> Nodes
end,
do_cluster_query(NextNodes, NQueryState, NResultAcc)
end end
end. end.
maybe_collect_total_from_tail_nodes([], _QueryState, ResultAcc) -> maybe_collect_total_from_tail_nodes([], QueryState) ->
ResultAcc; QueryState;
maybe_collect_total_from_tail_nodes(Nodes, QueryState, ResultAcc) -> maybe_collect_total_from_tail_nodes(Nodes, QueryState = #{total := _}) ->
case counting_total_fun(QueryState) of collect_total_from_tail_nodes(Nodes, QueryState);
false -> maybe_collect_total_from_tail_nodes(_Nodes, QueryState) ->
ResultAcc; QueryState.
_Fun ->
collect_total_from_tail_nodes(Nodes, QueryState, ResultAcc)
end.
collect_total_from_tail_nodes(Nodes, QueryState, ResultAcc = #{total := TotalAcc}) -> collect_total_from_tail_nodes(Nodes, QueryState = #{total := TotalAcc}) ->
%% XXX: badfun risk? if the FuzzyFun is an anonumous func in local node %% XXX: badfun risk? if the FuzzyFun is an anonumous func in local node
case rpc:multicall(Nodes, ?MODULE, apply_total_query, [QueryState], ?LONG_QUERY_TIMEOUT) of case rpc:multicall(Nodes, ?MODULE, apply_total_query, [QueryState], ?LONG_QUERY_TIMEOUT) of
{_, [Node | _]} -> {_, [Node | _]} ->
@ -257,7 +253,8 @@ collect_total_from_tail_nodes(Nodes, QueryState, ResultAcc = #{total := TotalAcc
[{Node, {badrpc, Reason}} | _] -> [{Node, {badrpc, Reason}} | _] ->
{error, Node, {badrpc, Reason}}; {error, Node, {badrpc, Reason}};
[] -> [] ->
ResultAcc#{total => ResL ++ TotalAcc} NTotalAcc = maps:merge(TotalAcc, maps:from_list(ResL)),
QueryState#{total := NTotalAcc}
end end
end. end.
@ -266,13 +263,14 @@ collect_total_from_tail_nodes(Nodes, QueryState, ResultAcc = #{total := TotalAcc
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% QueryState :: %% QueryState ::
%% #{continuation := ets:continuation(), %% #{continuation => ets:continuation(),
%% page := pos_integer(), %% page := pos_integer(),
%% limit := pos_integer(), %% limit := pos_integer(),
%% total := [{node(), non_neg_integer()}], %% total => #{node() => non_neg_integer()},
%% table := atom(), %% table := atom(),
%% qs := {Qs, Fuzzy} %% parsed query params %% qs := {Qs, Fuzzy}, %% parsed query params
%% msfun := query_to_match_spec_fun() %% msfun := query_to_match_spec_fun(),
%% complete := boolean()
%% } %% }
init_query_state(Tab, QString, MsFun, _Meta = #{page := Page, limit := Limit}) -> init_query_state(Tab, QString, MsFun, _Meta = #{page := Page, limit := Limit}) ->
#{match_spec := Ms, fuzzy_fun := FuzzyFun} = erlang:apply(MsFun, [Tab, QString]), #{match_spec := Ms, fuzzy_fun := FuzzyFun} = erlang:apply(MsFun, [Tab, QString]),
@ -285,17 +283,31 @@ init_query_state(Tab, QString, MsFun, _Meta = #{page := Page, limit := Limit}) -
true = is_list(Args), true = is_list(Args),
{type, external} = erlang:fun_info(NamedFun, type) {type, external} = erlang:fun_info(NamedFun, type)
end, end,
#{ QueryState = #{
page => Page, page => Page,
limit => Limit, limit => Limit,
table => Tab, table => Tab,
qs => QString, qs => QString,
msfun => MsFun, msfun => MsFun,
mactch_spec => Ms, match_spec => Ms,
fuzzy_fun => FuzzyFun, fuzzy_fun => FuzzyFun,
total => [], complete => false
continuation => ?FRESH_SELECT },
}. case counting_total_fun(QueryState) of
false ->
QueryState;
Fun when is_function(Fun) ->
QueryState#{total => #{}}
end.
reset_query_state(QueryState) ->
maps:remove(continuation, mark_complete(QueryState, false)).
mark_complete(QueryState) ->
mark_complete(QueryState, true).
mark_complete(QueryState, Complete) ->
QueryState#{complete => Complete}.
%% @private This function is exempt from BPAPI %% @private This function is exempt from BPAPI
do_query(Node, QueryState) when Node =:= node() -> do_query(Node, QueryState) when Node =:= node() ->
@ -318,47 +330,50 @@ do_select(
Node, Node,
QueryState0 = #{ QueryState0 = #{
table := Tab, table := Tab,
mactch_spec := Ms, match_spec := Ms,
fuzzy_fun := FuzzyFun, limit := Limit,
continuation := Continuation, complete := false
limit := Limit
} }
) -> ) ->
QueryState = maybe_apply_total_query(Node, QueryState0), QueryState = maybe_apply_total_query(Node, QueryState0),
Result = Result =
case Continuation of case maps:get(continuation, QueryState, undefined) of
?FRESH_SELECT -> undefined ->
ets:select(Tab, Ms, Limit); ets:select(Tab, Ms, Limit);
_ -> Continuation ->
%% XXX: Repair is necessary because we pass Continuation back %% XXX: Repair is necessary because we pass Continuation back
%% and forth through the nodes in the `do_cluster_query` %% and forth through the nodes in the `do_cluster_query`
ets:select(ets:repair_continuation(Continuation, Ms)) ets:select(ets:repair_continuation(Continuation, Ms))
end, end,
case Result of case Result of
'$end_of_table' -> {Rows, '$end_of_table'} ->
{[], QueryState#{continuation => ?FRESH_SELECT}}; NRows = maybe_apply_fuzzy_filter(Rows, QueryState),
{NRows, mark_complete(QueryState)};
{Rows, NContinuation} -> {Rows, NContinuation} ->
NRows = NRows = maybe_apply_fuzzy_filter(Rows, QueryState),
case FuzzyFun of {NRows, QueryState#{continuation => NContinuation}};
undefined -> '$end_of_table' ->
Rows; {[], mark_complete(QueryState)}
{FilterFun, Args0} when is_function(FilterFun), is_list(Args0) ->
lists:filter(
fun(E) -> erlang:apply(FilterFun, [E | Args0]) end,
Rows
)
end,
{NRows, QueryState#{continuation => NContinuation}}
end. end.
maybe_apply_total_query(Node, QueryState = #{total := TotalAcc}) -> maybe_apply_fuzzy_filter(Rows, #{fuzzy_fun := undefined}) ->
case proplists:get_value(Node, TotalAcc, undefined) of Rows;
undefined -> maybe_apply_fuzzy_filter(Rows, #{fuzzy_fun := {FilterFun, Args}}) ->
Total = apply_total_query(QueryState), lists:filter(
QueryState#{total := [{Node, Total} | TotalAcc]}; fun(E) -> erlang:apply(FilterFun, [E | Args]) end,
_ -> Rows
QueryState ).
end.
maybe_apply_total_query(Node, QueryState = #{total := Acc}) ->
case Acc of
#{Node := _} ->
QueryState;
#{} ->
NodeTotal = apply_total_query(QueryState),
QueryState#{total := Acc#{Node => NodeTotal}}
end;
maybe_apply_total_query(_Node, QueryState = #{}) ->
QueryState.
apply_total_query(QueryState = #{table := Tab}) -> apply_total_query(QueryState = #{table := Tab}) ->
case counting_total_fun(QueryState) of case counting_total_fun(QueryState) of
@ -371,7 +386,7 @@ apply_total_query(QueryState = #{table := Tab}) ->
counting_total_fun(_QueryState = #{qs := {[], []}}) -> counting_total_fun(_QueryState = #{qs := {[], []}}) ->
fun(Tab) -> ets:info(Tab, size) end; fun(Tab) -> ets:info(Tab, size) end;
counting_total_fun(_QueryState = #{mactch_spec := Ms, fuzzy_fun := undefined}) -> counting_total_fun(_QueryState = #{match_spec := Ms, fuzzy_fun := undefined}) ->
%% XXX: Calculating the total number of data that match a certain %% XXX: Calculating the total number of data that match a certain
%% condition under a large table is very expensive because the %% condition under a large table is very expensive because the
%% entire ETS table needs to be scanned. %% entire ETS table needs to be scanned.
@ -390,15 +405,16 @@ counting_total_fun(_QueryState = #{fuzzy_fun := FuzzyFun}) when FuzzyFun =/= und
%% ResultAcc :: #{count := integer(), %% ResultAcc :: #{count := integer(),
%% cursor := integer(), %% cursor := integer(),
%% rows := [{node(), Rows :: list()}], %% rows := [{node(), Rows :: list()}],
%% total := [{node() => integer()}] %% overflow := boolean(),
%% hasnext => boolean()
%% } %% }
init_query_result() -> init_query_result() ->
#{cursor => 0, count => 0, rows => [], total => []}. #{cursor => 0, count => 0, rows => [], overflow => false}.
accumulate_query_rows( accumulate_query_rows(
Node, Node,
Rows, Rows,
_QueryState = #{page := Page, limit := Limit, total := TotalAcc}, _QueryState = #{page := Page, limit := Limit},
ResultAcc = #{cursor := Cursor, count := Count, rows := RowsAcc} ResultAcc = #{cursor := Cursor, count := Count, rows := RowsAcc}
) -> ) ->
PageStart = (Page - 1) * Limit + 1, PageStart = (Page - 1) * Limit + 1,
@ -406,24 +422,35 @@ accumulate_query_rows(
Len = length(Rows), Len = length(Rows),
case Cursor + Len of case Cursor + Len of
NCursor when NCursor < PageStart -> NCursor when NCursor < PageStart ->
{more, ResultAcc#{cursor => NCursor, total => TotalAcc}}; {more, ResultAcc#{cursor => NCursor}};
NCursor when NCursor < PageEnd -> NCursor when NCursor < PageEnd ->
SubRows = lists:nthtail(max(0, PageStart - Cursor - 1), Rows),
{more, ResultAcc#{ {more, ResultAcc#{
cursor => NCursor, cursor => NCursor,
count => Count + length(Rows), count => Count + length(SubRows),
total => TotalAcc, rows => [{Node, SubRows} | RowsAcc]
rows => [{Node, Rows} | RowsAcc]
}}; }};
NCursor when NCursor >= PageEnd -> NCursor when NCursor >= PageEnd ->
SubRows = lists:sublist(Rows, Limit - Count), SubRows = lists:sublist(Rows, Limit - Count),
{enough, ResultAcc#{ {enough, ResultAcc#{
cursor => NCursor, cursor => NCursor,
count => Count + length(SubRows), count => Count + length(SubRows),
total => TotalAcc, rows => [{Node, SubRows} | RowsAcc],
rows => [{Node, SubRows} | RowsAcc] % there are more rows than can fit in the page
overflow => (Limit - Count) < Len
}} }}
end. end.
finalize_query(Result = #{overflow := Overflow}, QueryState = #{complete := Complete}) ->
HasNext = Overflow orelse not Complete,
maybe_accumulate_totals(Result#{hasnext => HasNext}, QueryState).
maybe_accumulate_totals(Result, #{total := TotalAcc}) ->
QueryTotal = maps:fold(fun(_Node, T, N) -> N + T end, 0, TotalAcc),
Result#{total => QueryTotal};
maybe_accumulate_totals(Result, _QueryState) ->
Result.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal Functions %% Internal Functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -520,16 +547,22 @@ is_fuzzy_key(<<"match_", _/binary>>) ->
is_fuzzy_key(_) -> is_fuzzy_key(_) ->
false. false.
format_query_result(_FmtFun, _Meta, Error = {error, _Node, _Reason}) -> format_query_result(_FmtFun, _MetaIn, Error = {error, _Node, _Reason}) ->
Error; Error;
format_query_result( format_query_result(
FmtFun, Meta, _ResultAcc = #{total := TotalAcc, rows := RowsAcc} FmtFun, MetaIn, ResultAcc = #{hasnext := HasNext, rows := RowsAcc}
) -> ) ->
Total = lists:foldr(fun({_Node, T}, N) -> N + T end, 0, TotalAcc), Meta =
case ResultAcc of
#{total := QueryTotal} ->
%% The `count` is used in HTTP API to indicate the total number of
%% queries that can be read
MetaIn#{hasnext => HasNext, count => QueryTotal};
#{} ->
MetaIn#{hasnext => HasNext}
end,
#{ #{
%% The `count` is used in HTTP API to indicate the total number of meta => Meta,
%% queries that can be read
meta => Meta#{count => Total},
data => lists:flatten( data => lists:flatten(
lists:foldl( lists:foldl(
fun({Node, Rows}, Acc) -> fun({Node, Rows}, Acc) ->
@ -552,7 +585,7 @@ parse_pager_params(Params) ->
Limit = b2i(limit(Params)), Limit = b2i(limit(Params)),
case Page > 0 andalso Limit > 0 of case Page > 0 andalso Limit > 0 of
true -> true ->
#{page => Page, limit => Limit, count => 0}; #{page => Page, limit => Limit};
false -> false ->
false false
end. end.

View File

@ -40,6 +40,10 @@
do_force_create_app/3 do_force_create_app/3
]). ]).
-ifdef(TEST).
-export([create/5]).
-endif.
-define(APP, emqx_app). -define(APP, emqx_app).
-record(?APP, { -record(?APP, {
@ -68,8 +72,12 @@ init_bootstrap_file() ->
init_bootstrap_file(File). init_bootstrap_file(File).
create(Name, Enable, ExpiredAt, Desc) -> create(Name, Enable, ExpiredAt, Desc) ->
ApiSecret = generate_api_secret(),
create(Name, ApiSecret, Enable, ExpiredAt, Desc).
create(Name, ApiSecret, Enable, ExpiredAt, Desc) ->
case mnesia:table_info(?APP, size) < 100 of case mnesia:table_info(?APP, size) < 100 of
true -> create_app(Name, Enable, ExpiredAt, Desc); true -> create_app(Name, ApiSecret, Enable, ExpiredAt, Desc);
false -> {error, "Maximum ApiKey"} false -> {error, "Maximum ApiKey"}
end. end.
@ -157,8 +165,7 @@ to_map(#?APP{name = N, api_key = K, enable = E, expired_at = ET, created_at = CT
is_expired(undefined) -> false; is_expired(undefined) -> false;
is_expired(ExpiredTime) -> ExpiredTime < erlang:system_time(second). is_expired(ExpiredTime) -> ExpiredTime < erlang:system_time(second).
create_app(Name, Enable, ExpiredAt, Desc) -> create_app(Name, ApiSecret, Enable, ExpiredAt, Desc) ->
ApiSecret = generate_api_secret(),
App = App =
#?APP{ #?APP{
name = Name, name = Name,
@ -170,9 +177,10 @@ create_app(Name, Enable, ExpiredAt, Desc) ->
api_key = list_to_binary(emqx_misc:gen_id(16)) api_key = list_to_binary(emqx_misc:gen_id(16))
}, },
case create_app(App) of case create_app(App) of
{error, api_key_already_existed} -> create_app(Name, Enable, ExpiredAt, Desc); {ok, Res} ->
{ok, Res} -> {ok, Res#{api_secret => ApiSecret}}; {ok, Res#{api_secret => ApiSecret}};
Error -> Error Error ->
Error
end. end.
create_app(App = #?APP{api_key = ApiKey, name = Name}) -> create_app(App = #?APP{api_key = ApiKey, name = Name}) ->

View File

@ -88,10 +88,9 @@ t_cluster_query(_Config) ->
%% fuzzy searching can't return total %% fuzzy searching can't return total
{200, ClientsNode2} = query_clients(Node2, #{<<"like_username">> => <<"corenode2">>}), {200, ClientsNode2} = query_clients(Node2, #{<<"like_username">> => <<"corenode2">>}),
?assertMatch( MetaNode2 = maps:get(meta, ClientsNode2),
#{count := 0}, ?assertNotMatch(#{count := _}, MetaNode2),
maps:get(meta, ClientsNode2) ?assertMatch(#{hasnext := false}, MetaNode2),
),
?assertMatch(10, length(maps:get(data, ClientsNode2))), ?assertMatch(10, length(maps:get(data, ClientsNode2))),
_ = lists:foreach(fun(C) -> emqtt:disconnect(C) end, ClientLs1), _ = lists:foreach(fun(C) -> emqtt:disconnect(C) end, ClientLs1),

View File

@ -225,21 +225,23 @@ t_create_unexpired_app(_Config) ->
ok. ok.
list_app() -> list_app() ->
AuthHeader = emqx_dashboard_SUITE:auth_header_(),
Path = emqx_mgmt_api_test_util:api_path(["api_key"]), Path = emqx_mgmt_api_test_util:api_path(["api_key"]),
case emqx_mgmt_api_test_util:request_api(get, Path) of case emqx_mgmt_api_test_util:request_api(get, Path, AuthHeader) of
{ok, Apps} -> {ok, emqx_json:decode(Apps, [return_maps])}; {ok, Apps} -> {ok, emqx_json:decode(Apps, [return_maps])};
Error -> Error Error -> Error
end. end.
read_app(Name) -> read_app(Name) ->
AuthHeader = emqx_dashboard_SUITE:auth_header_(),
Path = emqx_mgmt_api_test_util:api_path(["api_key", Name]), Path = emqx_mgmt_api_test_util:api_path(["api_key", Name]),
case emqx_mgmt_api_test_util:request_api(get, Path) of case emqx_mgmt_api_test_util:request_api(get, Path, AuthHeader) of
{ok, Res} -> {ok, emqx_json:decode(Res, [return_maps])}; {ok, Res} -> {ok, emqx_json:decode(Res, [return_maps])};
Error -> Error Error -> Error
end. end.
create_app(Name) -> create_app(Name) ->
AuthHeader = emqx_mgmt_api_test_util:auth_header_(), AuthHeader = emqx_dashboard_SUITE:auth_header_(),
Path = emqx_mgmt_api_test_util:api_path(["api_key"]), Path = emqx_mgmt_api_test_util:api_path(["api_key"]),
ExpiredAt = to_rfc3339(erlang:system_time(second) + 1000), ExpiredAt = to_rfc3339(erlang:system_time(second) + 1000),
App = #{ App = #{
@ -254,7 +256,7 @@ create_app(Name) ->
end. end.
create_unexpired_app(Name, Params) -> create_unexpired_app(Name, Params) ->
AuthHeader = emqx_mgmt_api_test_util:auth_header_(), AuthHeader = emqx_dashboard_SUITE:auth_header_(),
Path = emqx_mgmt_api_test_util:api_path(["api_key"]), Path = emqx_mgmt_api_test_util:api_path(["api_key"]),
App = maps:merge(#{name => Name, desc => <<"Note"/utf8>>, enable => true}, Params), App = maps:merge(#{name => Name, desc => <<"Note"/utf8>>, enable => true}, Params),
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, App) of case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, App) of
@ -263,11 +265,12 @@ create_unexpired_app(Name, Params) ->
end. end.
delete_app(Name) -> delete_app(Name) ->
AuthHeader = emqx_dashboard_SUITE:auth_header_(),
DeletePath = emqx_mgmt_api_test_util:api_path(["api_key", Name]), DeletePath = emqx_mgmt_api_test_util:api_path(["api_key", Name]),
emqx_mgmt_api_test_util:request_api(delete, DeletePath). emqx_mgmt_api_test_util:request_api(delete, DeletePath, AuthHeader).
update_app(Name, Change) -> update_app(Name, Change) ->
AuthHeader = emqx_mgmt_api_test_util:auth_header_(), AuthHeader = emqx_dashboard_SUITE:auth_header_(),
UpdatePath = emqx_mgmt_api_test_util:api_path(["api_key", Name]), UpdatePath = emqx_mgmt_api_test_util:api_path(["api_key", Name]),
case emqx_mgmt_api_test_util:request_api(put, UpdatePath, "", AuthHeader, Change) of case emqx_mgmt_api_test_util:request_api(put, UpdatePath, "", AuthHeader, Change) of
{ok, Update} -> {ok, emqx_json:decode(Update, [return_maps])}; {ok, Update} -> {ok, emqx_json:decode(Update, [return_maps])};

View File

@ -44,9 +44,8 @@ init_per_suite(Config) ->
end_per_suite(_) -> end_per_suite(_) ->
emqx_mgmt_api_test_util:end_suite(). emqx_mgmt_api_test_util:end_suite().
t_subscription_api(_) -> t_subscription_api(Config) ->
{ok, Client} = emqtt:start_link(#{username => ?USERNAME, clientid => ?CLIENTID, proto_ver => v5}), Client = proplists:get_value(client, Config),
{ok, _} = emqtt:connect(Client),
{ok, _, _} = emqtt:subscribe( {ok, _, _} = emqtt:subscribe(
Client, [ Client, [
{?TOPIC1, [{rh, ?TOPIC1RH}, {rap, ?TOPIC1RAP}, {nl, ?TOPIC1NL}, {qos, ?TOPIC1QOS}]} {?TOPIC1, [{rh, ?TOPIC1RH}, {rap, ?TOPIC1RAP}, {nl, ?TOPIC1NL}, {qos, ?TOPIC1QOS}]}
@ -84,40 +83,78 @@ t_subscription_api(_) ->
?assertEqual(maps:get(<<"topic">>, Subscriptions2), ?TOPIC2), ?assertEqual(maps:get(<<"topic">>, Subscriptions2), ?TOPIC2),
?assertEqual(maps:get(<<"clientid">>, Subscriptions2), ?CLIENTID), ?assertEqual(maps:get(<<"clientid">>, Subscriptions2), ?CLIENTID),
QS = uri_string:compose_query([ QS = [
{"clientid", ?CLIENTID}, {"clientid", ?CLIENTID},
{"topic", ?TOPIC2_TOPIC_ONLY}, {"topic", ?TOPIC2_TOPIC_ONLY},
{"node", atom_to_list(node())}, {"node", atom_to_list(node())},
{"qos", "0"}, {"qos", "0"},
{"share_group", "test_group"}, {"share_group", "test_group"},
{"match_topic", "t/#"} {"match_topic", "t/#"}
]), ],
Headers = emqx_mgmt_api_test_util:auth_header_(), Headers = emqx_mgmt_api_test_util:auth_header_(),
{ok, ResponseTopic2} = emqx_mgmt_api_test_util:request_api(get, Path, QS, Headers), DataTopic2 = #{<<"meta">> := Meta2} = request_json(get, QS, Headers),
DataTopic2 = emqx_json:decode(ResponseTopic2, [return_maps]),
Meta2 = maps:get(<<"meta">>, DataTopic2),
?assertEqual(1, maps:get(<<"page">>, Meta2)), ?assertEqual(1, maps:get(<<"page">>, Meta2)),
?assertEqual(emqx_mgmt:max_row_limit(), maps:get(<<"limit">>, Meta2)), ?assertEqual(emqx_mgmt:max_row_limit(), maps:get(<<"limit">>, Meta2)),
?assertEqual(1, maps:get(<<"count">>, Meta2)), ?assertEqual(1, maps:get(<<"count">>, Meta2)),
SubscriptionsList2 = maps:get(<<"data">>, DataTopic2), SubscriptionsList2 = maps:get(<<"data">>, DataTopic2),
?assertEqual(length(SubscriptionsList2), 1), ?assertEqual(length(SubscriptionsList2), 1).
MatchQs = uri_string:compose_query([ t_subscription_fuzzy_search(Config) ->
Client = proplists:get_value(client, Config),
Topics = [
<<"t/foo">>,
<<"t/foo/bar">>,
<<"t/foo/baz">>,
<<"topic/foo/bar">>,
<<"topic/foo/baz">>
],
_ = [{ok, _, _} = emqtt:subscribe(Client, T) || T <- Topics],
Headers = emqx_mgmt_api_test_util:auth_header_(),
MatchQs = [
{"clientid", ?CLIENTID}, {"clientid", ?CLIENTID},
{"node", atom_to_list(node())}, {"node", atom_to_list(node())},
{"qos", "0"},
{"match_topic", "t/#"} {"match_topic", "t/#"}
]), ],
{ok, MatchRes} = emqx_mgmt_api_test_util:request_api(get, Path, MatchQs, Headers), MatchData1 = #{<<"meta">> := MatchMeta1} = request_json(get, MatchQs, Headers),
MatchData = emqx_json:decode(MatchRes, [return_maps]), ?assertEqual(1, maps:get(<<"page">>, MatchMeta1)),
MatchMeta = maps:get(<<"meta">>, MatchData), ?assertEqual(emqx_mgmt:max_row_limit(), maps:get(<<"limit">>, MatchMeta1)),
?assertEqual(1, maps:get(<<"page">>, MatchMeta)), %% count is undefined in fuzzy searching
?assertEqual(emqx_mgmt:max_row_limit(), maps:get(<<"limit">>, MatchMeta)), ?assertNot(maps:is_key(<<"count">>, MatchMeta1)),
%% count equals 0 in fuzzy searching ?assertMatch(3, length(maps:get(<<"data">>, MatchData1))),
?assertEqual(0, maps:get(<<"count">>, MatchMeta)), ?assertEqual(false, maps:get(<<"hasnext">>, MatchMeta1)),
MatchSubs = maps:get(<<"data">>, MatchData),
?assertEqual(1, length(MatchSubs)),
LimitMatchQuery = [
{"clientid", ?CLIENTID},
{"match_topic", "+/+/+"},
{"limit", "3"}
],
MatchData2 = #{<<"meta">> := MatchMeta2} = request_json(get, LimitMatchQuery, Headers),
?assertEqual(#{<<"page">> => 1, <<"limit">> => 3, <<"hasnext">> => true}, MatchMeta2),
?assertEqual(3, length(maps:get(<<"data">>, MatchData2))),
MatchData2P2 =
#{<<"meta">> := MatchMeta2P2} =
request_json(get, [{"page", "2"} | LimitMatchQuery], Headers),
?assertEqual(#{<<"page">> => 2, <<"limit">> => 3, <<"hasnext">> => false}, MatchMeta2P2),
?assertEqual(1, length(maps:get(<<"data">>, MatchData2P2))).
request_json(Method, Query, Headers) when is_list(Query) ->
Qs = uri_string:compose_query(Query),
{ok, MatchRes} = emqx_mgmt_api_test_util:request_api(Method, path(), Qs, Headers),
emqx_json:decode(MatchRes, [return_maps]).
path() ->
emqx_mgmt_api_test_util:api_path(["subscriptions"]).
init_per_testcase(_TC, Config) ->
{ok, Client} = emqtt:start_link(#{username => ?USERNAME, clientid => ?CLIENTID, proto_ver => v5}),
{ok, _} = emqtt:connect(Client),
[{client, Client} | Config].
end_per_testcase(_TC, Config) ->
Client = proplists:get_value(client, Config),
emqtt:disconnect(Client). emqtt:disconnect(Client).

View File

@ -24,14 +24,19 @@ init_suite() ->
init_suite([]). init_suite([]).
init_suite(Apps) -> init_suite(Apps) ->
init_suite(Apps, fun set_special_configs/1).
init_suite(Apps, SetConfigs) ->
mria:start(), mria:start(),
application:load(emqx_management), application:load(emqx_management),
emqx_common_test_helpers:start_apps(Apps ++ [emqx_dashboard], fun set_special_configs/1). emqx_common_test_helpers:start_apps(Apps ++ [emqx_dashboard], SetConfigs),
emqx_common_test_http:create_default_app().
end_suite() -> end_suite() ->
end_suite([]). end_suite([]).
end_suite(Apps) -> end_suite(Apps) ->
emqx_common_test_http:delete_default_app(),
application:unload(emqx_management), application:unload(emqx_management),
emqx_common_test_helpers:stop_apps(Apps ++ [emqx_dashboard]), emqx_common_test_helpers:stop_apps(Apps ++ [emqx_dashboard]),
emqx_config:delete_override_conf_files(), emqx_config:delete_override_conf_files(),
@ -43,8 +48,23 @@ set_special_configs(emqx_dashboard) ->
set_special_configs(_App) -> set_special_configs(_App) ->
ok. ok.
%% there is no difference between the 'request' and 'request_api'
%% the 'request' is only to be compatible with the 'emqx_dashboard_api_test_helpers:request'
request(Method, Url) ->
request(Method, Url, []).
request(Method, Url, Body) ->
request_api_with_body(Method, Url, Body).
uri(Parts) ->
emqx_dashboard_api_test_helpers:uri(Parts).
%% compatible_mode will return as same as 'emqx_dashboard_api_test_helpers:request'
request_api_with_body(Method, Url, Body) ->
request_api(Method, Url, [], auth_header_(), Body, #{compatible_mode => true}).
request_api(Method, Url) -> request_api(Method, Url) ->
request_api(Method, Url, [], [], [], #{}). request_api(Method, Url, auth_header_()).
request_api(Method, Url, AuthOrHeaders) -> request_api(Method, Url, AuthOrHeaders) ->
request_api(Method, Url, [], AuthOrHeaders, [], #{}). request_api(Method, Url, [], AuthOrHeaders, [], #{}).
@ -90,10 +110,20 @@ request_api(Method, Url, QueryParams, AuthOrHeaders, Body, Opts) when
do_request_api(Method, Request, Opts) -> do_request_api(Method, Request, Opts) ->
ReturnAll = maps:get(return_all, Opts, false), ReturnAll = maps:get(return_all, Opts, false),
CompatibleMode = maps:get(compatible_mode, Opts, false),
ReqOpts =
case CompatibleMode of
true ->
[{body_format, binary}];
_ ->
[]
end,
ct:pal("Method: ~p, Request: ~p", [Method, Request]), ct:pal("Method: ~p, Request: ~p", [Method, Request]),
case httpc:request(Method, Request, [], []) of case httpc:request(Method, Request, [], ReqOpts) of
{error, socket_closed_remotely} -> {error, socket_closed_remotely} ->
{error, socket_closed_remotely}; {error, socket_closed_remotely};
{ok, {{_, Code, _}, _Headers, Body}} when CompatibleMode ->
{ok, Code, Body};
{ok, {{"HTTP/1.1", Code, _} = Reason, Headers, Body}} when {ok, {{"HTTP/1.1", Code, _} = Reason, Headers, Body}} when
Code >= 200 andalso Code =< 299 andalso ReturnAll Code >= 200 andalso Code =< 299 andalso ReturnAll
-> ->
@ -109,10 +139,7 @@ do_request_api(Method, Request, Opts) ->
end. end.
auth_header_() -> auth_header_() ->
Username = <<"admin">>, emqx_common_test_http:default_auth_header().
Password = <<"public">>,
{ok, Token} = emqx_dashboard_admin:sign_token(Username, Password),
{"Authorization", "Bearer " ++ binary_to_list(Token)}.
build_http_header(X) when is_list(X) -> build_http_header(X) when is_list(X) ->
X; X;

View File

@ -30,6 +30,8 @@
-define(API_VERSION, "v5"). -define(API_VERSION, "v5").
-define(BASE_PATH, "api"). -define(BASE_PATH, "api").
-import(emqx_dashboard_SUITE, [auth_header_/0]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Setups %% Setups
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -330,13 +332,6 @@ t_stream_log(_Config) ->
to_rfc3339(Second) -> to_rfc3339(Second) ->
list_to_binary(calendar:system_time_to_rfc3339(Second)). list_to_binary(calendar:system_time_to_rfc3339(Second)).
auth_header_() ->
auth_header_("admin", "public").
auth_header_(User, Pass) ->
Encoded = base64:encode_to_string(lists:append([User, ":", Pass])),
{"Authorization", "Basic " ++ Encoded}.
request_api(Method, Url, Auth) -> do_request_api(Method, {Url, [Auth]}). request_api(Method, Url, Auth) -> do_request_api(Method, {Url, [Auth]}).
request_api(Method, Url, Auth, Body) -> request_api(Method, Url, Auth, Body) ->

View File

@ -26,7 +26,7 @@
<<"max_delayed_messages">> => <<"0">> <<"max_delayed_messages">> => <<"0">>
}). }).
-import(emqx_dashboard_api_test_helpers, [request/2, request/3, uri/1]). -import(emqx_mgmt_api_test_util, [request/2, request/3, uri/1]).
all() -> all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
@ -36,27 +36,21 @@ init_per_suite(Config) ->
raw_with_default => true raw_with_default => true
}), }),
ok = emqx_common_test_helpers:start_apps( ok = emqx_mgmt_api_test_util:init_suite(
[emqx_conf, emqx_modules, emqx_dashboard], [emqx_conf, emqx_modules]
fun set_special_configs/1
), ),
emqx_delayed:load(), emqx_delayed:load(),
Config. Config.
end_per_suite(Config) -> end_per_suite(Config) ->
ok = emqx_delayed:unload(), ok = emqx_delayed:unload(),
emqx_common_test_helpers:stop_apps([emqx_conf, emqx_dashboard, emqx_modules]), emqx_mgmt_api_test_util:end_suite([emqx_conf, emqx_modules]),
Config. Config.
init_per_testcase(_, Config) -> init_per_testcase(_, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(), {ok, _} = emqx_cluster_rpc:start_link(),
Config. Config.
set_special_configs(emqx_dashboard) ->
emqx_dashboard_api_test_helpers:set_default_config();
set_special_configs(_App) ->
ok.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Test Cases %% Test Cases
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------

View File

@ -18,7 +18,7 @@
-compile(nowarn_export_all). -compile(nowarn_export_all).
-compile(export_all). -compile(export_all).
-import(emqx_dashboard_api_test_helpers, [request/3, uri/1]). -import(emqx_mgmt_api_test_util, [request/3, uri/1]).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
@ -37,20 +37,14 @@ init_per_suite(Config) ->
raw_with_default => true raw_with_default => true
}), }),
ok = emqx_common_test_helpers:start_apps( ok = emqx_mgmt_api_test_util:init_suite(
[emqx_conf, emqx_modules, emqx_dashboard], [emqx_conf, emqx_modules]
fun set_special_configs/1
), ),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([emqx_conf, emqx_dashboard, emqx_modules]), emqx_mgmt_api_test_util:end_suite([emqx_conf, emqx_modules]),
ok.
set_special_configs(emqx_dashboard) ->
emqx_dashboard_api_test_helpers:set_default_config();
set_special_configs(_App) ->
ok. ok.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -81,7 +75,7 @@ t_mqtt_topic_rewrite(_) ->
?assertEqual( ?assertEqual(
Rules, Rules,
jsx:decode(Result) emqx_json:decode(Result, [return_maps])
). ).
t_mqtt_topic_rewrite_limit(_) -> t_mqtt_topic_rewrite_limit(_) ->

View File

@ -18,7 +18,7 @@
-compile(nowarn_export_all). -compile(nowarn_export_all).
-compile(export_all). -compile(export_all).
-import(emqx_dashboard_api_test_helpers, [request/2, request/3, uri/1]). -import(emqx_mgmt_api_test_util, [request/2, request/3, uri/1]).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
@ -33,8 +33,8 @@ init_per_suite(Config) ->
raw_with_default => true raw_with_default => true
}), }),
ok = emqx_common_test_helpers:start_apps( ok = emqx_mgmt_api_test_util:init_suite(
[emqx_conf, emqx_authn, emqx_authz, emqx_modules, emqx_dashboard], [emqx_conf, emqx_authn, emqx_authz, emqx_modules],
fun set_special_configs/1 fun set_special_configs/1
), ),
@ -49,8 +49,8 @@ end_per_suite(_Config) ->
<<"sources">> => [] <<"sources">> => []
} }
), ),
emqx_common_test_helpers:stop_apps([ emqx_mgmt_api_test_util:end_suite([
emqx_dashboard, emqx_conf, emqx_authn, emqx_authz, emqx_modules emqx_conf, emqx_authn, emqx_authz, emqx_modules
]), ]),
ok. ok.

View File

@ -18,7 +18,7 @@
-compile(nowarn_export_all). -compile(nowarn_export_all).
-compile(export_all). -compile(export_all).
-import(emqx_dashboard_api_test_helpers, [request/3, uri/1]). -import(emqx_mgmt_api_test_util, [request/2, request/3, uri/1]).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
@ -44,9 +44,8 @@ init_per_suite(Config) ->
raw_with_default => true raw_with_default => true
}), }),
ok = emqx_common_test_helpers:start_apps( ok = emqx_mgmt_api_test_util:init_suite(
[emqx_conf, emqx_modules, emqx_dashboard], [emqx_conf, emqx_modules]
fun set_special_configs/1
), ),
%% When many tests run in an obscure order, it may occur that %% When many tests run in an obscure order, it may occur that
@ -59,15 +58,10 @@ init_per_suite(Config) ->
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([emqx_conf, emqx_dashboard, emqx_modules]), emqx_mgmt_api_test_util:end_suite([emqx_conf, emqx_modules]),
application:stop(gen_rpc), application:stop(gen_rpc),
ok. ok.
set_special_configs(emqx_dashboard) ->
emqx_dashboard_api_test_helpers:set_default_config();
set_special_configs(_App) ->
ok.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Tests %% Tests
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -315,6 +309,3 @@ t_badrpc(_) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Helpers %% Helpers
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
request(Method, Url) ->
request(Method, Url, []).

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_plugin_libs, [ {application, emqx_plugin_libs, [
{description, "EMQX Plugin utility libs"}, {description, "EMQX Plugin utility libs"},
{vsn, "4.3.4"}, {vsn, "4.3.5"},
{modules, []}, {modules, []},
{applications, [kernel, stdlib]}, {applications, [kernel, stdlib]},
{env, []} {env, []}

View File

@ -63,6 +63,8 @@
can_topic_match_oneof/2 can_topic_match_oneof/2
]). ]).
-export_type([tmpl_token/0]).
-compile({no_auto_import, [float/1]}). -compile({no_auto_import, [float/1]}).
-define(EX_PLACE_HOLDER, "(\\$\\{[a-zA-Z0-9\\._]+\\})"). -define(EX_PLACE_HOLDER, "(\\$\\{[a-zA-Z0-9\\._]+\\})").

View File

@ -24,6 +24,35 @@ emqx_prometheus_schema {
zh: """数据推送间隔""" zh: """数据推送间隔"""
} }
} }
headers {
desc {
en: """A list of HTTP Headers when pushing to Push Gateway.<br/>
For example, <code> { Authorization = "some-authz-tokens"}</code>"""
zh: """推送到 Push Gateway 的 HTTP Headers 列表。<br/>
例如,<code> { Authorization = "some-authz-tokens"}</code>"""
}
}
job_name {
desc {
en: """Job Name that is pushed to the Push Gateway. Available variables:<br/>
- ${name}: Name of EMQX node.<br/>
- ${host}: Host name of EMQX node.<br/>
For example, when the EMQX node name is <code>emqx@127.0.0.1</code> then the <code>name</code> variable takes value <code>emqx</code> and the <code>host</code> variable takes value <code>127.0.0.1</code>.<br/>
Default value is: <code>${name}/instance/${name}~${host}</code>
"""
zh: """推送到 Push Gateway 的 Job 名称。可用变量为:<br/>
- ${name}: EMQX 节点的名称。
- ${host}: EMQX 节点主机名。
例如,当 EMQX 节点名为 <code>emqx@127.0.0.1</code> 则 name 变量的值为 <code>emqx</code>host 变量的值为 <code>127.0.0.1</code>。<br/>
默认值为: <code>${name}/instance/${name}~${host}</code>"""
}
}
enable { enable {
desc { desc {
en: """Turn Prometheus data pushing on or off""" en: """Turn Prometheus data pushing on or off"""

View File

@ -2,7 +2,7 @@
{application, emqx_prometheus, [ {application, emqx_prometheus, [
{description, "Prometheus for EMQX"}, {description, "Prometheus for EMQX"},
% strict semver, bump manually! % strict semver, bump manually!
{vsn, "5.0.3"}, {vsn, "5.0.4"},
{modules, []}, {modules, []},
{registered, [emqx_prometheus_sup]}, {registered, [emqx_prometheus_sup]},
{applications, [kernel, stdlib, prometheus, emqx]}, {applications, [kernel, stdlib, prometheus, emqx]},

View File

@ -98,8 +98,13 @@ handle_cast(_Msg, State) ->
{noreply, State}. {noreply, State}.
handle_info({timeout, Timer, ?TIMER_MSG}, State = #{timer := Timer}) -> handle_info({timeout, Timer, ?TIMER_MSG}, State = #{timer := Timer}) ->
#{interval := Interval, push_gateway_server := Server} = opts(), #{
PushRes = push_to_push_gateway(Server), interval := Interval,
headers := Headers,
job_name := JobName,
push_gateway_server := Server
} = opts(),
PushRes = push_to_push_gateway(Server, Headers, JobName),
NewTimer = ensure_timer(Interval), NewTimer = ensure_timer(Interval),
NewState = maps:update_with(PushRes, fun(C) -> C + 1 end, 1, State#{timer => NewTimer}), NewState = maps:update_with(PushRes, fun(C) -> C + 1 end, 1, State#{timer => NewTimer}),
%% Data is too big, hibernate for saving memory and stop system monitor warning. %% Data is too big, hibernate for saving memory and stop system monitor warning.
@ -107,18 +112,27 @@ handle_info({timeout, Timer, ?TIMER_MSG}, State = #{timer := Timer}) ->
handle_info(_Msg, State) -> handle_info(_Msg, State) ->
{noreply, State}. {noreply, State}.
push_to_push_gateway(Uri) -> push_to_push_gateway(Uri, Headers, JobName) when is_list(Headers) ->
[Name, Ip] = string:tokens(atom_to_list(node()), "@"), [Name, Ip] = string:tokens(atom_to_list(node()), "@"),
Url = lists:concat([Uri, "/metrics/job/", Name, "/instance/", Name, "~", Ip]), JobName1 = emqx_placeholder:preproc_tmpl(JobName),
JobName2 = binary_to_list(
emqx_placeholder:proc_tmpl(
JobName1,
#{<<"name">> => Name, <<"host">> => Ip}
)
),
Url = lists:concat([Uri, "/metrics/job/", JobName2]),
Data = prometheus_text_format:format(), Data = prometheus_text_format:format(),
case httpc:request(post, {Url, [], "text/plain", Data}, ?HTTP_OPTIONS, []) of case httpc:request(post, {Url, Headers, "text/plain", Data}, ?HTTP_OPTIONS, []) of
{ok, {{"HTTP/1.1", 200, "OK"}, _Headers, _Body}} -> {ok, {{"HTTP/1.1", 200, _}, _RespHeaders, _RespBody}} ->
ok; ok;
Error -> Error ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "post_to_push_gateway_failed", msg => "post_to_push_gateway_failed",
error => Error, error => Error,
url => Url url => Url,
headers => Headers
}), }),
failed failed
end. end.

View File

@ -121,6 +121,8 @@ prometheus_config_example() ->
enable => true, enable => true,
interval => "15s", interval => "15s",
push_gateway_server => <<"http://127.0.0.1:9091">>, push_gateway_server => <<"http://127.0.0.1:9091">>,
headers => #{'header-name' => 'header-value'},
job_name => <<"${name}/instance/${name}~${host}">>,
vm_dist_collector => enabled, vm_dist_collector => enabled,
mnesia_collector => enabled, mnesia_collector => enabled,
vm_statistics_collector => enabled, vm_statistics_collector => enabled,

View File

@ -25,7 +25,8 @@
roots/0, roots/0,
fields/1, fields/1,
desc/1, desc/1,
translation/1 translation/1,
convert_headers/1
]). ]).
namespace() -> "prometheus". namespace() -> "prometheus".
@ -52,6 +53,26 @@ fields("prometheus") ->
desc => ?DESC(interval) desc => ?DESC(interval)
} }
)}, )},
{headers,
?HOCON(
list({string(), string()}),
#{
default => #{},
required => false,
converter => fun ?MODULE:convert_headers/1,
desc => ?DESC(headers)
}
)},
{job_name,
?HOCON(
binary(),
#{
default => <<"${name}/instance/${name}~${host}">>,
required => true,
desc => ?DESC(job_name)
}
)},
{enable, {enable,
?HOCON( ?HOCON(
boolean(), boolean(),
@ -126,6 +147,17 @@ fields("prometheus") ->
desc("prometheus") -> ?DESC(prometheus); desc("prometheus") -> ?DESC(prometheus);
desc(_) -> undefined. desc(_) -> undefined.
convert_headers(Headers) when is_map(Headers) ->
maps:fold(
fun(K, V, Acc) ->
[{binary_to_list(K), binary_to_list(V)} | Acc]
end,
[],
Headers
);
convert_headers(Headers) when is_list(Headers) ->
Headers.
%% for CI test, CI don't load the whole emqx_conf_schema. %% for CI test, CI don't load the whole emqx_conf_schema.
translation(Name) -> translation(Name) ->
emqx_conf_schema:translation(Name). emqx_conf_schema:translation(Name).

View File

@ -27,6 +27,8 @@
"prometheus {\n" "prometheus {\n"
" push_gateway_server = \"http://127.0.0.1:9091\"\n" " push_gateway_server = \"http://127.0.0.1:9091\"\n"
" interval = \"1s\"\n" " interval = \"1s\"\n"
" headers = { Authorization = \"some-authz-tokens\"}\n"
" job_name = \"${name}~${host}\"\n"
" enable = true\n" " enable = true\n"
" vm_dist_collector = enabled\n" " vm_dist_collector = enabled\n"
" mnesia_collector = enabled\n" " mnesia_collector = enabled\n"
@ -85,6 +87,25 @@ t_collector_no_crash_test(_) ->
prometheus_text_format:format(), prometheus_text_format:format(),
ok. ok.
t_assert_push(_) ->
meck:new(httpc, [passthrough]),
Self = self(),
AssertPush = fun(Method, Req = {Url, Headers, ContentType, _Data}, HttpOpts, Opts) ->
?assertEqual(post, Method),
?assertMatch("http://127.0.0.1:9091/metrics/job/test~127.0.0.1", Url),
?assertEqual([{"Authorization", "some-authz-tokens"}], Headers),
?assertEqual("text/plain", ContentType),
Self ! pass,
meck:passthrough([Method, Req, HttpOpts, Opts])
end,
meck:expect(httpc, request, AssertPush),
?assertMatch(ok, emqx_prometheus_sup:start_child(emqx_prometheus)),
receive
pass -> ok
after 2000 ->
ct:fail(assert_push_request_failed)
end.
t_only_for_coverage(_) -> t_only_for_coverage(_) ->
?assertEqual("5.0.0", emqx_prometheus_proto_v1:introduced_in()), ?assertEqual("5.0.0", emqx_prometheus_proto_v1:introduced_in()),
ok. ok.

View File

@ -36,8 +36,8 @@ For bridges only have ingress direction data flow, it can be set to 0 otherwise
health_check_interval { health_check_interval {
desc { desc {
en: """Health check interval, in milliseconds.""" en: """Health check interval."""
zh: """健康检查间隔,单位毫秒。""" zh: """健康检查间隔。"""
} }
label { label {
en: """Health Check Interval""" en: """Health Check Interval"""
@ -69,8 +69,8 @@ For bridges only have ingress direction data flow, it can be set to 0 otherwise
auto_restart_interval { auto_restart_interval {
desc { desc {
en: """The auto restart interval after the resource is disconnected, in milliseconds.""" en: """The auto restart interval after the resource is disconnected."""
zh: """资源断开以后,自动重连的时间间隔,单位毫秒。""" zh: """资源断开以后,自动重连的时间间隔。"""
} }
label { label {
en: """Auto Restart Interval""" en: """Auto Restart Interval"""

View File

@ -23,6 +23,7 @@
-export([ -export([
namespace/0, namespace/0,
tags/0,
roots/0, roots/0,
fields/1, fields/1,
desc/1, desc/1,
@ -33,6 +34,9 @@
namespace() -> rule_engine. namespace() -> rule_engine.
tags() ->
[<<"Rule Engine">>].
roots() -> ["rule_engine"]. roots() -> ["rule_engine"].
fields("rule_engine") -> fields("rule_engine") ->

View File

@ -203,13 +203,7 @@ do_request_api(Method, Request) ->
end. end.
auth_header_() -> auth_header_() ->
AppId = <<"admin">>, emqx_mgmt_api_test_util:auth_header_().
AppSecret = <<"public">>,
auth_header_(binary_to_list(AppId), binary_to_list(AppSecret)).
auth_header_(User, Pass) ->
Encoded = base64:encode_to_string(lists:append([User, ":", Pass])),
{"Authorization", "Basic " ++ Encoded}.
api_path(Parts) -> api_path(Parts) ->
?HOST ++ filename:join([?BASE_PATH, ?API_VERSION] ++ Parts). ?HOST ++ filename:join([?BASE_PATH, ?API_VERSION] ++ Parts).

View File

@ -24,12 +24,19 @@ main(Args) ->
["hocon" | Rest] -> ["hocon" | Rest] ->
%% forward the call to hocon_cli %% forward the call to hocon_cli
hocon_cli:main(Rest); hocon_cli:main(Rest);
["check_license_key", Key] -> ["check_license_key", Key0] ->
check_license(#{key => list_to_binary(Key)}); Key = cleanup_key(Key0),
check_license(#{key => Key});
_ -> _ ->
do(Args) do(Args)
end. end.
%% the key is a string (list) representation of a binary, so we need
%% to remove the leading and trailing angle brackets.
cleanup_key(Str0) ->
Str1 = iolist_to_binary(string:replace(Str0, "<<", "", leading)),
iolist_to_binary(string:replace(Str1, ">>", "", trailing)).
do(Args) -> do(Args) ->
ok = do_with_halt(Args, "mnesia_dir", fun create_mnesia_dir/2), ok = do_with_halt(Args, "mnesia_dir", fun create_mnesia_dir/2),
ok = do_with_halt(Args, "chkconfig", fun("-config", X) -> chkconfig(X) end), ok = do_with_halt(Args, "chkconfig", fun("-config", X) -> chkconfig(X) end),

View File

@ -0,0 +1 @@
Basic auth is no longer allowed for API calls, must use API key instead.

View File

@ -0,0 +1 @@
API 调用不再支持基于 `username:password``baisc` 认证, 现在 API 必须通过 API Key 才能进行调用。

View File

@ -0,0 +1,3 @@
Add the following configuration options for Pushing metrics to Prometheus Push Gateway:
- `headers`: Allows custom HTTP request headers.
- `job_name`: allows to customize the name of the Job pushed to Push Gateway.

View File

@ -0,0 +1,3 @@
为 Prometheus 推送到 Push Gateway 新增以下配置项:
- `headers`:允许自定义 HTTP 请求头。
- `job_name`:允许自定义推送到 Push Gateway 的 Job 名称。

View File

@ -0,0 +1,11 @@
Remove the config `auto_reconnect` from the emqx_authz, emqx_authn and data-bridge componets.
This is because we have another config with similar functions: `resource_opts.auto_restart_interval`
The functions of these two config are difficult to distinguish, which will lead to confusion.
After this change, `auto_reconnect` will not be configurable (always be true), and the underlying
drivers that support this config will automatically reconnect the abnormally disconnected
connection every `2s`.
And the config `resource_opts.auto_restart_interval` is still available for user.
It is the time interval that emqx restarts the resource when the connection cannot be
established for some reason.

View File

@ -0,0 +1,8 @@
从认证、鉴权和数据桥接功能中,删除 `auto_reconnect` 配置项,因为我们还有另一个功能类似的配置项:
`resource_opts.auto_restart_interval`
这两个配置项的功能难以区分,会导致困惑。此修改之后,`auto_reconnect` 将不可配置(永远为 true)
支持此配置的底层驱动将以 `2s` 为周期自动重连异常断开的连接。
`resource_opts.auto_restart_interval` 配置项仍然开放给用户配置,它是资源因为某些原因
无法建立连接的时候emqx 重新启动该资源的时间间隔。

View File

@ -0,0 +1 @@
Password information has been removed from information log messages for http, ldap, mongo, mqtt, mysql, pgsql and redis.

View File

@ -0,0 +1 @@
密码信息已从http、ldap、mongo、mqtt、mysql、pgsql和redis的信息日志消息中删除。

View File

@ -0,0 +1,2 @@
Return authorization settings with default values.
The authorization cache is enabled by default, but due to the missing default value in `GET` response of `/authorization/settings`, it seemed to be disabled from the dashboard.

View File

@ -0,0 +1,3 @@
为授权设置 API 返回默认值。
授权缓存默认为开启,但是在此修复前,因为默认值在 `/authorization/settings` 这个 API 的返回值中缺失,
使得在仪表盘配置页面中看起来是关闭了。

View File

@ -0,0 +1 @@
Client fuzzy search API results were missing information which could tell if more results are available in the next pages, this is now fixed by providing `hasnext` flag in the response.

View File

@ -0,0 +1 @@
在此修复前,客户端模糊搜索 API 缺少一些可以用于判断是否可以继续翻页的信息,现在通过在响应中提供 `hasnext` 标志来解决这个问题。

View File

@ -1,6 +1,7 @@
ARG BUILD_FROM=ghcr.io/emqx/emqx-builder/5.0-26:1.13.4-24.3.4.2-1-debian11 ARG BUILD_FROM=ghcr.io/emqx/emqx-builder/5.0-26:1.13.4-24.3.4.2-1-debian11
ARG RUN_FROM=debian:11-slim ARG RUN_FROM=debian:11-slim
FROM ${BUILD_FROM} AS builder ARG BUILDPLATFORM=linux/amd64
FROM --platform=$BUILDPLATFORM ${BUILD_FROM} AS builder
COPY . /emqx COPY . /emqx

View File

@ -1,6 +1,7 @@
ARG BUILD_FROM=ghcr.io/emqx/emqx-builder/5.0-26:1.13.4-24.3.4.2-1-alpine3.15.1 ARG BUILD_FROM=ghcr.io/emqx/emqx-builder/5.0-26:1.13.4-24.3.4.2-1-alpine3.15.1
ARG RUN_FROM=alpine:3.15.1 ARG RUN_FROM=alpine:3.15.1
FROM ${BUILD_FROM} AS builder ARG BUILDPLATFORM=linux/amd64
FROM --platform=$BUILDPLATFORM ${BUILD_FROM} AS builder
RUN apk add --no-cache \ RUN apk add --no-cache \
autoconf \ autoconf \

View File

@ -86,4 +86,15 @@ emqx_ee_bridge_mongodb {
zh: "桥接名称" zh: "桥接名称"
} }
} }
payload_template {
desc {
en: "The template for formatting the outgoing messages. If undefined, rule engine will use JSON format to serialize all visible inputs, such as clientid, topic, payload etc."
zh: "用于格式化写入 MongoDB 的消息模板。 如果未定义,规则引擎会使用 JSON 格式序列化所有的可见输入,例如 clientid, topic, payload 等。"
}
label: {
en: "Payload template"
zh: "有效载荷模板"
}
}
} }

View File

@ -61,9 +61,9 @@ resource_type(Type) when is_binary(Type) -> resource_type(binary_to_atom(Type, u
resource_type(kafka) -> emqx_bridge_impl_kafka; resource_type(kafka) -> emqx_bridge_impl_kafka;
resource_type(hstreamdb) -> emqx_ee_connector_hstreamdb; resource_type(hstreamdb) -> emqx_ee_connector_hstreamdb;
resource_type(gcp_pubsub) -> emqx_ee_connector_gcp_pubsub; resource_type(gcp_pubsub) -> emqx_ee_connector_gcp_pubsub;
resource_type(mongodb_rs) -> emqx_connector_mongo; resource_type(mongodb_rs) -> emqx_ee_connector_mongodb;
resource_type(mongodb_sharded) -> emqx_connector_mongo; resource_type(mongodb_sharded) -> emqx_ee_connector_mongodb;
resource_type(mongodb_single) -> emqx_connector_mongo; resource_type(mongodb_single) -> emqx_ee_connector_mongodb;
resource_type(mysql) -> emqx_connector_mysql; resource_type(mysql) -> emqx_connector_mysql;
resource_type(influxdb_api_v1) -> emqx_ee_connector_influxdb; resource_type(influxdb_api_v1) -> emqx_ee_connector_influxdb;
resource_type(influxdb_api_v2) -> emqx_ee_connector_influxdb; resource_type(influxdb_api_v2) -> emqx_ee_connector_influxdb;

View File

@ -196,33 +196,25 @@ to_influx_lines(RawLines) ->
converter_influx_line(Line, AccIn) -> converter_influx_line(Line, AccIn) ->
case string:tokens(str(Line), " ") of case string:tokens(str(Line), " ") of
[MeasurementAndTags, Fields, Timestamp] -> [MeasurementAndTags, Fields, Timestamp] ->
{Measurement, Tags} = split_measurement_and_tags(MeasurementAndTags), append_influx_item(MeasurementAndTags, Fields, Timestamp, AccIn);
[
#{
measurement => Measurement,
tags => kv_pairs(Tags),
fields => kv_pairs(string:tokens(Fields, ",")),
timestamp => Timestamp
}
| AccIn
];
[MeasurementAndTags, Fields] -> [MeasurementAndTags, Fields] ->
{Measurement, Tags} = split_measurement_and_tags(MeasurementAndTags), append_influx_item(MeasurementAndTags, Fields, undefined, AccIn);
%% TODO: fix here both here and influxdb driver.
%% Default value should evaluated by InfluxDB.
[
#{
measurement => Measurement,
tags => kv_pairs(Tags),
fields => kv_pairs(string:tokens(Fields, ",")),
timestamp => "${timestamp}"
}
| AccIn
];
_ -> _ ->
throw("Bad InfluxDB Line Protocol schema") throw("Bad InfluxDB Line Protocol schema")
end. end.
append_influx_item(MeasurementAndTags, Fields, Timestamp, Acc) ->
{Measurement, Tags} = split_measurement_and_tags(MeasurementAndTags),
[
#{
measurement => Measurement,
tags => kv_pairs(Tags),
fields => kv_pairs(string:tokens(Fields, ",")),
timestamp => Timestamp
}
| Acc
].
split_measurement_and_tags(Subject) -> split_measurement_and_tags(Subject) ->
case string:tokens(Subject, ",") of case string:tokens(Subject, ",") of
[] -> [] ->

View File

@ -37,7 +37,8 @@ roots() ->
fields("config") -> fields("config") ->
[ [
{enable, mk(boolean(), #{desc => ?DESC("enable"), default => true})}, {enable, mk(boolean(), #{desc => ?DESC("enable"), default => true})},
{collection, mk(binary(), #{desc => ?DESC("collection"), default => <<"mqtt">>})} {collection, mk(binary(), #{desc => ?DESC("collection"), default => <<"mqtt">>})},
{payload_template, mk(binary(), #{required => false, desc => ?DESC("payload_template")})}
]; ];
fields(mongodb_rs) -> fields(mongodb_rs) ->
emqx_connector_mongo:fields(rs) ++ fields("config"); emqx_connector_mongo:fields(rs) ++ fields("config");

View File

@ -51,7 +51,6 @@ values(post) ->
pool_size => 8, pool_size => 8,
username => <<"root">>, username => <<"root">>,
password => <<"">>, password => <<"">>,
auto_reconnect => true,
sql => ?DEFAULT_SQL, sql => ?DEFAULT_SQL,
local_topic => <<"local/topic/#">>, local_topic => <<"local/topic/#">>,
resource_opts => #{ resource_opts => #{

View File

@ -53,7 +53,6 @@ values(post, Type) ->
pool_size => 8, pool_size => 8,
username => <<"root">>, username => <<"root">>,
password => <<"public">>, password => <<"public">>,
auto_reconnect => true,
sql => ?DEFAULT_SQL, sql => ?DEFAULT_SQL,
local_topic => <<"local/topic/#">>, local_topic => <<"local/topic/#">>,
resource_opts => #{ resource_opts => #{

Some files were not shown because too many files have changed in this diff Show More