Merge remote-tracking branch 'origin/master' into 0529-donot-copy-cluster-conf-from-newer-version

This commit is contained in:
Zaiming (Stone) Shi 2023-06-03 11:21:22 +02:00
commit f469b31fee
166 changed files with 1428 additions and 756 deletions

View File

@ -18,7 +18,7 @@ services:
- /tmp/emqx-ci/emqx-shared-secret:/var/lib/secret
kdc:
hostname: kdc.emqx.net
image: ghcr.io/emqx/emqx-builder/5.0-33:1.13.4-24.3.4.2-3-ubuntu20.04
image: ghcr.io/emqx/emqx-builder/5.1-0:1.14.5-25.3.2-1-ubuntu20.04
container_name: kdc.emqx.net
expose:
- 88 # kdc

View File

@ -3,7 +3,7 @@ version: '3.9'
services:
python:
container_name: python
image: python:3.7.2-alpine3.9
image: python:3.9.16-alpine3.18
depends_on:
- emqx1
- emqx2
@ -12,4 +12,3 @@ services:
emqx_bridge:
volumes:
- ./python:/scripts

View File

@ -23,6 +23,7 @@ services:
- ./rocketmq/logs:/opt/logs
- ./rocketmq/store:/opt/store
- ./rocketmq/conf/broker.conf:/etc/rocketmq/broker.conf
- ./rocketmq/conf/plain_acl.yml:/home/rocketmq/rocketmq-4.9.4/conf/plain_acl.yml
environment:
NAMESRV_ADDR: "rocketmq_namesrv:9876"
JAVA_OPTS: " -Duser.home=/opt -Drocketmq.broker.diskSpaceWarningLevelRatio=0.99"

View File

@ -3,7 +3,7 @@ version: '3.9'
services:
erlang:
container_name: erlang
image: ${DOCKER_CT_RUNNER_IMAGE:-ghcr.io/emqx/emqx-builder/5.0-33:1.13.4-24.3.4.2-3-ubuntu20.04}
image: ${DOCKER_CT_RUNNER_IMAGE:-ghcr.io/emqx/emqx-builder/5.1-0:1.14.5-25.3.2-1-ubuntu20.04}
env_file:
- conf.env
environment:

View File

@ -18,13 +18,13 @@ else
fi
apk update && apk add git curl
git clone -b develop-4.0 https://github.com/emqx/paho.mqtt.testing.git /paho.mqtt.testing
pip install pytest==6.2.5
git clone -b develop-5.0 https://github.com/emqx/paho.mqtt.testing.git /paho.mqtt.testing
pip install pytest==7.1.2 pytest-retry
pytest -v /paho.mqtt.testing/interoperability/test_client/V5/test_connect.py -k test_basic --host "$TARGET_HOST"
pytest --retries 3 -v /paho.mqtt.testing/interoperability/test_client/V5/test_connect.py -k test_basic --host "$TARGET_HOST"
RESULT=$?
pytest -v /paho.mqtt.testing/interoperability/test_client --host "$TARGET_HOST"
pytest --retries 3 -v /paho.mqtt.testing/interoperability/test_client --host "$TARGET_HOST"
RESULT=$(( RESULT + $? ))
# pytest -v /paho.mqtt.testing/interoperability/test_cluster --host1 "node1.emqx.io" --host2 "node2.emqx.io"

View File

@ -20,3 +20,5 @@ maxMessageSize=65536
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
aclEnable=true

View File

@ -0,0 +1,11 @@
globalWhiteRemoteAddresses:
accounts:
- accessKey: RocketMQ
secretKey: 12345678
whiteRemoteAddress:
admin: false
defaultTopicPerm: DENY
defaultGroupPerm: PUB|SUB
topicPerms:
- TopicTest=PUB|SUB

View File

@ -3,7 +3,7 @@ inputs:
profile: # emqx, emqx-enterprise
required: true
type: string
otp: # 25.1.2-2, 24.3.4.2-2
otp: # 25.3.2-1
required: true
type: string
os:

View File

@ -25,7 +25,7 @@ jobs:
prepare:
runs-on: ubuntu-22.04
# prepare source with any OTP version, no need for a matrix
container: "ghcr.io/emqx/emqx-builder/5.0-35:1.13.4-24.3.4.2-3-ubuntu22.04"
container: "ghcr.io/emqx/emqx-builder/5.1-0:1.14.5-25.3.2-1-ubuntu22.04"
outputs:
PROFILE: ${{ steps.get_profile.outputs.PROFILE }}
@ -120,12 +120,12 @@ jobs:
# NOTE: 'otp' and 'elixir' are to configure emqx-builder image
# only support latest otp and elixir, not a matrix
builder:
- 5.0-35 # update to latest
- 5.1-0 # update to latest
otp:
- 24.3.4.2-3 # switch to 25 once ready to release 5.1
- 25.3.2-1
elixir:
- 'no_elixir'
- '1.13.4' # update to latest
- '1.14.5' # update to latest
exclude: # TODO: publish enterprise to ecr too?
- registry: 'public.ecr.aws'
profile: emqx-enterprise

View File

@ -21,7 +21,7 @@ on:
jobs:
prepare:
runs-on: ubuntu-22.04
container: ghcr.io/emqx/emqx-builder/5.0-35:1.13.4-24.3.4.2-3-ubuntu22.04
container: ghcr.io/emqx/emqx-builder/5.1-0:1.14.5-25.3.2-1-ubuntu22.04
outputs:
BUILD_PROFILE: ${{ steps.get_profile.outputs.BUILD_PROFILE }}
IS_EXACT_TAG: ${{ steps.get_profile.outputs.IS_EXACT_TAG }}
@ -90,7 +90,7 @@ jobs:
- uses: ilammy/msvc-dev-cmd@v1.12.0
- uses: erlef/setup-beam@v1.15.2
with:
otp-version: 24.3.4.6
otp-version: 25.3.2
- name: build
env:
PYTHON: python
@ -132,7 +132,7 @@ jobs:
profile:
- ${{ needs.prepare.outputs.BUILD_PROFILE }}
otp:
- 24.3.4.2-3
- 25.3.2-1
os:
- macos-11
- macos-12
@ -173,7 +173,7 @@ jobs:
profile:
- ${{ needs.prepare.outputs.BUILD_PROFILE }}
otp:
- 24.3.4.2-3
- 25.3.2-1
arch:
- amd64
- arm64
@ -191,9 +191,9 @@ jobs:
- aws-arm64
- ubuntu-22.04
builder:
- 5.0-35
- 5.1-0
elixir:
- 1.13.4
- 1.14.5
exclude:
- arch: arm64
build_machine: ubuntu-22.04
@ -201,20 +201,20 @@ jobs:
build_machine: aws-arm64
include:
- profile: emqx
otp: 25.1.2-3
otp: 25.3.2-1
arch: amd64
os: ubuntu22.04
build_machine: ubuntu-22.04
builder: 5.0-35
elixir: 1.13.4
builder: 5.1-0
elixir: 1.14.5
release_with: elixir
- profile: emqx
otp: 25.1.2-3
otp: 25.3.2-1
arch: amd64
os: amzn2
build_machine: ubuntu-22.04
builder: 5.0-35
elixir: 1.13.4
builder: 5.1-0
elixir: 1.14.5
release_with: elixir
defaults:

View File

@ -25,16 +25,16 @@ jobs:
- ['emqx', 'master']
- ['emqx-enterprise', 'release-51']
otp:
- 24.3.4.2-3
- 25.3.2-1
arch:
- amd64
os:
- debian10
- amzn2
builder:
- 5.0-35
- 5.1-0
elixir:
- 1.13.4
- 1.14.5
defaults:
run:
@ -92,7 +92,7 @@ jobs:
branch:
- master
otp:
- 24.3.4.2-3
- 25.3.2-1
os:
- macos-12
- macos-12-arm64

View File

@ -30,14 +30,14 @@ jobs:
fail-fast: false
matrix:
profile:
- ["emqx", "24.3.4.2-3", "el7", "erlang"]
- ["emqx", "25.1.2-3", "ubuntu22.04", "elixir"]
- ["emqx-enterprise", "24.3.4.2-3", "amzn2", "erlang"]
- ["emqx-enterprise", "25.1.2-3", "ubuntu20.04", "erlang"]
- ["emqx", "25.3.2-1", "el7", "erlang"]
- ["emqx", "25.3.2-1", "ubuntu22.04", "elixir"]
- ["emqx-enterprise", "25.3.2-1", "amzn2", "erlang"]
- ["emqx-enterprise", "25.3.2-1", "ubuntu20.04", "erlang"]
builder:
- 5.0-35
- 5.1-0
elixir:
- '1.13.4'
- '1.14.5'
container: "ghcr.io/emqx/emqx-builder/${{ matrix.builder }}:${{ matrix.elixir }}-${{ matrix.profile[1] }}-${{ matrix.profile[2] }}"
@ -92,7 +92,7 @@ jobs:
profile:
- emqx
otp:
- 25.1.2
- 25.3.2
steps:
- uses: actions/checkout@v3
- uses: ilammy/msvc-dev-cmd@v1.12.0
@ -138,7 +138,7 @@ jobs:
- emqx
- emqx-enterprise
otp:
- 24.3.4.2-3
- 25.3.2-1
os:
- macos-11
- macos-12-arm64
@ -165,7 +165,7 @@ jobs:
path: _packages/**/*
docker:
runs-on: ubuntu-22.04
runs-on: aws-amd64
strategy:
fail-fast: false
@ -196,12 +196,17 @@ jobs:
tags: ${{ env.EMQX_IMAGE_TAG }}
build-args: |
EMQX_NAME=${{ env.EMQX_NAME }}
- name: test docker image
- name: smoke test
run: |
CID=$(docker run -d --rm -P $EMQX_IMAGE_TAG)
HTTP_PORT=$(docker inspect --format='{{(index (index .NetworkSettings.Ports "18083/tcp") 0).HostPort}}' $CID)
./scripts/test/emqx-smoke-test.sh localhost $HTTP_PORT
docker stop $CID
- name: dashboard tests
working-directory: ./scripts/ui-tests
run: |
set -eu
docker compose up --abort-on-container-exit --exit-code-from selenium
- name: test two nodes cluster with proto_dist=inet_tls in docker
run: |
./scripts/test/start-two-nodes-in-docker.sh -P $EMQX_IMAGE_TAG $EMQX_IMAGE_OLD_VERSION_TAG
@ -216,6 +221,11 @@ jobs:
with:
name: "${{ matrix.profile[0] }}-docker"
path: "${{ env.EMQX_NAME }}-${{ env.PKG_VSN }}.tar.gz"
- name: cleanup
if: always()
working-directory: ./scripts/ui-tests
run: |
docker compose rm -fs
spellcheck:
needs: linux

View File

@ -6,7 +6,7 @@ on:
jobs:
check_deps_integrity:
runs-on: ubuntu-22.04
container: ghcr.io/emqx/emqx-builder/5.0-35:1.13.4-25.1.2-3-ubuntu22.04
container: ghcr.io/emqx/emqx-builder/5.1-0:1.14.5-25.3.2-1-ubuntu22.04
steps:
- uses: actions/checkout@v3

View File

@ -5,7 +5,7 @@ on: [pull_request]
jobs:
code_style_check:
runs-on: ubuntu-22.04
container: "ghcr.io/emqx/emqx-builder/5.0-35:1.13.4-25.1.2-3-ubuntu22.04"
container: "ghcr.io/emqx/emqx-builder/5.1-0:1.14.5-25.3.2-1-ubuntu22.04"
steps:
- uses: actions/checkout@v3
with:

View File

@ -9,7 +9,7 @@ jobs:
elixir_apps_check:
runs-on: ubuntu-22.04
# just use the latest builder
container: "ghcr.io/emqx/emqx-builder/5.0-35:1.13.4-25.1.2-3-ubuntu22.04"
container: "ghcr.io/emqx/emqx-builder/5.1-0:1.14.5-25.3.2-1-ubuntu22.04"
strategy:
fail-fast: false

View File

@ -8,7 +8,7 @@ on:
jobs:
elixir_deps_check:
runs-on: ubuntu-22.04
container: ghcr.io/emqx/emqx-builder/5.0-35:1.13.4-25.1.2-3-ubuntu22.04
container: ghcr.io/emqx/emqx-builder/5.1-0:1.14.5-25.3.2-1-ubuntu22.04
steps:
- name: Checkout

View File

@ -17,7 +17,7 @@ jobs:
profile:
- emqx
- emqx-enterprise
container: ghcr.io/emqx/emqx-builder/5.0-35:1.13.4-25.1.2-3-ubuntu22.04
container: ghcr.io/emqx/emqx-builder/5.1-0:1.14.5-25.3.2-1-ubuntu22.04
steps:
- name: Checkout
uses: actions/checkout@v3

View File

@ -15,7 +15,7 @@ jobs:
prepare:
runs-on: ubuntu-latest
if: github.repository_owner == 'emqx'
container: ghcr.io/emqx/emqx-builder/5.0-35:1.13.4-25.1.2-3-ubuntu20.04
container: ghcr.io/emqx/emqx-builder/5.1-0:1.14.5-25.3.2-1-ubuntu20.04
outputs:
BENCH_ID: ${{ steps.prepare.outputs.BENCH_ID }}
PACKAGE_FILE: ${{ steps.package_file.outputs.PACKAGE_FILE }}

View File

@ -26,7 +26,7 @@ jobs:
profile:
- emqx
- emqx-enterprise
container: "ghcr.io/emqx/emqx-builder/5.0-34:1.13.4-25.1.2-3-ubuntu22.04"
container: "ghcr.io/emqx/emqx-builder/5.1-0:1.14.5-25.3.2-1-ubuntu22.04"
steps:
- uses: AutoModality/action-clean@v1
- uses: actions/checkout@v3

View File

@ -12,14 +12,14 @@ jobs:
strategy:
matrix:
builder:
- 5.0-35
- 5.1-0
otp:
- 25.1.2-3
- 25.3.2-1
# no need to use more than 1 version of Elixir, since tests
# run using only Erlang code. This is needed just to specify
# the base image.
elixir:
- 1.13.4
- 1.14.5
os:
- ubuntu22.04
arch:

View File

@ -17,7 +17,7 @@ jobs:
prepare:
runs-on: ubuntu-22.04
# prepare source with any OTP version, no need for a matrix
container: ghcr.io/emqx/emqx-builder/5.0-35:1.13.4-24.3.4.2-3-debian11
container: ghcr.io/emqx/emqx-builder/5.1-0:1.14.5-25.3.2-1-debian11
steps:
- uses: actions/checkout@v3
@ -50,17 +50,17 @@ jobs:
os:
- ["debian11", "debian:11-slim"]
builder:
- 5.0-35
- 5.1-0
otp:
- 24.3.4.2-3
- 25.3.2-1
elixir:
- 1.13.4
- 1.14.5
arch:
- amd64
steps:
- uses: erlef/setup-beam@v1.15.2
with:
otp-version: 24.3.4.6
otp-version: 25.3.2
- uses: actions/download-artifact@v3
with:
name: source
@ -123,11 +123,11 @@ jobs:
os:
- ["debian11", "debian:11-slim"]
builder:
- 5.0-35
- 5.1-0
otp:
- 24.3.4.2-3
- 25.3.2-1
elixir:
- 1.13.4
- 1.14.5
arch:
- amd64
# - emqx-enterprise # TODO test enterprise
@ -135,7 +135,7 @@ jobs:
steps:
- uses: erlef/setup-beam@v1.15.2
with:
otp-version: 24.3.4.6
otp-version: 25.3.2
- uses: actions/download-artifact@v3
with:
name: source
@ -228,11 +228,11 @@ jobs:
- uses: actions/checkout@v3
with:
repository: emqx/paho.mqtt.testing
ref: develop-4.0
ref: develop-5.0
path: paho.mqtt.testing
- name: install pytest
run: |
pip install pytest
pip install pytest==7.1.2 pytest-retry
echo "$HOME/.local/bin" >> $GITHUB_PATH
- name: run paho test
timeout-minutes: 10
@ -250,6 +250,6 @@ jobs:
sleep 10
done
pytest -v paho.mqtt.testing/interoperability/test_client/V5/test_connect.py -k test_basic --host "127.0.0.1"
pytest --retries 3 -v paho.mqtt.testing/interoperability/test_client/V5/test_connect.py -k test_basic --host "127.0.0.1"
- if: failure()
run: kubectl logs -l "app.kubernetes.io/instance=${{ matrix.profile }}" -c emqx --tail=1000

View File

@ -16,7 +16,7 @@ jobs:
steps:
- uses: erlef/setup-beam@v1.15.2
with:
otp-version: 24.3.4.6
otp-version: 25.3.2
- name: download jmeter
timeout-minutes: 3
env:
@ -59,7 +59,7 @@ jobs:
steps:
- uses: erlef/setup-beam@v1.15.2
with:
otp-version: 24.3.4.6
otp-version: 25.3.2
- uses: actions/checkout@v3
- uses: actions/download-artifact@v3
with:
@ -155,7 +155,7 @@ jobs:
steps:
- uses: erlef/setup-beam@v1.15.2
with:
otp-version: 24.3.4.6
otp-version: 25.3.2
- uses: actions/checkout@v3
- uses: actions/download-artifact@v3
with:
@ -261,7 +261,7 @@ jobs:
steps:
- uses: erlef/setup-beam@v1.15.2
with:
otp-version: 24.3.4.6
otp-version: 25.3.2
- uses: actions/checkout@v3
- uses: actions/download-artifact@v3
with:
@ -363,7 +363,7 @@ jobs:
steps:
- uses: erlef/setup-beam@v1.15.2
with:
otp-version: 24.3.4.6
otp-version: 25.3.2
- uses: actions/checkout@v3
- uses: actions/download-artifact@v3
with:
@ -462,7 +462,7 @@ jobs:
steps:
- uses: erlef/setup-beam@v1.15.2
with:
otp-version: 24.3.4.6
otp-version: 25.3.2
- uses: actions/checkout@v3
- uses: actions/download-artifact@v3
with:

View File

@ -15,7 +15,7 @@ concurrency:
jobs:
relup_test_plan:
runs-on: ubuntu-22.04
container: "ghcr.io/emqx/emqx-builder/5.0-35:1.13.4-24.3.4.2-3-ubuntu22.04"
container: "ghcr.io/emqx/emqx-builder/5.1-0:1.14.5-25.3.2-1-ubuntu22.04"
outputs:
CUR_EE_VSN: ${{ steps.find-versions.outputs.CUR_EE_VSN }}
OLD_VERSIONS: ${{ steps.find-versions.outputs.OLD_VERSIONS }}
@ -73,7 +73,7 @@ jobs:
# setup Erlang to run lux
- uses: erlef/setup-beam@v1.15.2
with:
otp-version: 24.3.4.6
otp-version: 25.3.2
- uses: actions/checkout@v3
with:
repository: hawk/lux

View File

@ -34,14 +34,14 @@ jobs:
MATRIX="$(echo "${APPS}" | jq -c '
[
(.[] | select(.profile == "emqx") | . + {
builder: "5.0-35",
otp: "25.1.2-3",
elixir: "1.13.4"
builder: "5.1-0",
otp: "25.3.2-1",
elixir: "1.14.5"
}),
(.[] | select(.profile == "emqx-enterprise") | . + {
builder: "5.0-35",
otp: ["24.3.4.2-3", "25.1.2-3"][],
elixir: "1.13.4"
builder: "5.1-0",
otp: ["25.3.2-1"][],
elixir: "1.14.5"
})
]
')"
@ -258,12 +258,12 @@ jobs:
- ct
- ct_docker
runs-on: ubuntu-22.04
container: "ghcr.io/emqx/emqx-builder/5.0-35:1.13.4-24.3.4.2-3-ubuntu22.04"
container: "ghcr.io/emqx/emqx-builder/5.1-0:1.14.5-25.3.2-1-ubuntu22.04"
steps:
- uses: AutoModality/action-clean@v1
- uses: actions/download-artifact@v3
with:
name: source-emqx-enterprise-24.3.4.2-3
name: source-emqx-enterprise-25.3.2-1
path: .
- name: unzip source code
run: unzip -q source.zip

View File

@ -1,2 +1,2 @@
erlang 24.3.4.2-3
elixir 1.13.4-otp-24
erlang 25.3.2-1
elixir 1.14.5-otp-25

View File

@ -2,7 +2,7 @@ REBAR = $(CURDIR)/rebar3
BUILD = $(CURDIR)/build
SCRIPTS = $(CURDIR)/scripts
export EMQX_RELUP ?= true
export EMQX_DEFAULT_BUILDER = ghcr.io/emqx/emqx-builder/5.0-28:1.13.4-24.3.4.2-2-debian11
export EMQX_DEFAULT_BUILDER = ghcr.io/emqx/emqx-builder/5.1-0:1.14.5-25.3.2-1-debian11
export EMQX_DEFAULT_RUNNER = debian:11-slim
export EMQX_REL_FORM ?= tgz
export QUICER_DOWNLOAD_FROM_RELEASE = 1

View File

@ -20,6 +20,7 @@
-include_lib("emqx/include/logger.hrl").
-define(AUTHN_TRACE_TAG, "AUTHN").
-define(GLOBAL, 'mqtt:global').
-define(TRACE_AUTHN_PROVIDER(Msg), ?TRACE_AUTHN_PROVIDER(Msg, #{})).
-define(TRACE_AUTHN_PROVIDER(Msg, Meta), ?TRACE_AUTHN_PROVIDER(debug, Msg, Meta)).

View File

@ -29,6 +29,7 @@
{emqx_management,3}.
{emqx_management,4}.
{emqx_mgmt_api_plugins,1}.
{emqx_mgmt_api_plugins,2}.
{emqx_mgmt_cluster,1}.
{emqx_mgmt_trace,1}.
{emqx_mgmt_trace,2}.

View File

@ -239,14 +239,30 @@ remove_config([RootName | _] = KeyPath, Opts) ->
-spec reset_config(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
reset_config([RootName | _] = KeyPath, Opts) ->
reset_config([RootName | SubKeys] = KeyPath, Opts) ->
case emqx_config:get_default_value(KeyPath) of
{ok, Default} ->
Mod = emqx_config:get_schema_mod(RootName),
case SubKeys =:= [] of
true ->
emqx_config_handler:update_config(
emqx_config:get_schema_mod(RootName),
Mod,
KeyPath,
{{update, Default}, Opts}
);
false ->
NewConf =
emqx_utils_maps:deep_put(
SubKeys,
emqx_config:get_raw([RootName], #{}),
Default
),
emqx_config_handler:update_config(
Mod,
[RootName],
{{update, NewConf}, Opts}
)
end;
{error, _} = Error ->
Error
end.

View File

@ -60,7 +60,8 @@
update_authenticator/3,
lookup_authenticator/2,
list_authenticators/1,
move_authenticator/3
move_authenticator/3,
reorder_authenticator/2
]).
%% APIs for observer built_in_database
@ -86,12 +87,6 @@
%% utility functions
-export([authenticator_id/1, metrics_id/2]).
%% proxy callback
-export([
pre_config_update/3,
post_config_update/5
]).
-export_type([
authenticator_id/0,
position/0,
@ -275,12 +270,6 @@ get_enabled(Authenticators) ->
%% APIs
%%------------------------------------------------------------------------------
pre_config_update(Path, UpdateReq, OldConfig) ->
emqx_authentication_config:pre_config_update(Path, UpdateReq, OldConfig).
post_config_update(Path, UpdateReq, NewConfig, OldConfig, AppEnvs) ->
emqx_authentication_config:post_config_update(Path, UpdateReq, NewConfig, OldConfig, AppEnvs).
%% @doc Get all registered authentication providers.
get_providers() ->
call(get_providers).
@ -413,6 +402,12 @@ list_authenticators(ChainName) ->
move_authenticator(ChainName, AuthenticatorID, Position) ->
call({move_authenticator, ChainName, AuthenticatorID, Position}).
-spec reorder_authenticator(chain_name(), [authenticator_id()]) -> ok.
reorder_authenticator(_ChainName, []) ->
ok;
reorder_authenticator(ChainName, AuthenticatorIDs) ->
call({reorder_authenticator, ChainName, AuthenticatorIDs}).
-spec import_users(chain_name(), authenticator_id(), {binary(), binary()}) ->
ok | {error, term()}.
import_users(ChainName, AuthenticatorID, Filename) ->
@ -447,8 +442,9 @@ list_users(ChainName, AuthenticatorID, FuzzyParams) ->
init(_Opts) ->
process_flag(trap_exit, true),
ok = emqx_config_handler:add_handler([?CONF_ROOT], ?MODULE),
ok = emqx_config_handler:add_handler([listeners, '?', '?', ?CONF_ROOT], ?MODULE),
Module = emqx_authentication_config,
ok = emqx_config_handler:add_handler([?CONF_ROOT], Module),
ok = emqx_config_handler:add_handler([listeners, '?', '?', ?CONF_ROOT], Module),
{ok, #{hooked => false, providers => #{}}}.
handle_call(get_providers, _From, #{providers := Providers} = State) ->
@ -504,6 +500,12 @@ handle_call({move_authenticator, ChainName, AuthenticatorID, Position}, _From, S
end,
Reply = with_chain(ChainName, UpdateFun),
reply(Reply, State);
handle_call({reorder_authenticator, ChainName, AuthenticatorIDs}, _From, State) ->
UpdateFun = fun(Chain) ->
handle_reorder_authenticator(Chain, AuthenticatorIDs)
end,
Reply = with_chain(ChainName, UpdateFun),
reply(Reply, State);
handle_call({import_users, ChainName, AuthenticatorID, Filename}, _From, State) ->
Reply = call_authenticator(ChainName, AuthenticatorID, import_users, [Filename]),
reply(Reply, State);
@ -609,6 +611,24 @@ handle_move_authenticator(Chain, AuthenticatorID, Position) ->
{error, Reason}
end.
handle_reorder_authenticator(Chain, AuthenticatorIDs) ->
#chain{authenticators = Authenticators} = Chain,
NAuthenticators =
lists:filtermap(
fun(ID) ->
case lists:keyfind(ID, #authenticator.id, Authenticators) of
false ->
?SLOG(error, #{msg => "authenticator_not_found", id => ID}),
false;
Authenticator ->
{true, Authenticator}
end
end,
AuthenticatorIDs
),
NewChain = Chain#chain{authenticators = NAuthenticators},
{ok, ok, NewChain}.
handle_create_authenticator(Chain, Config, Providers) ->
#chain{name = Name, authenticators = Authenticators} = Chain,
AuthenticatorID = authenticator_id(Config),

View File

@ -65,8 +65,8 @@
-spec pre_config_update(list(atom()), update_request(), emqx_config:raw_config()) ->
{ok, map() | list()} | {error, term()}.
pre_config_update(_, UpdateReq, OldConfig) ->
try do_pre_config_update(UpdateReq, to_list(OldConfig)) of
pre_config_update(Paths, UpdateReq, OldConfig) ->
try do_pre_config_update(Paths, UpdateReq, to_list(OldConfig)) of
{error, Reason} -> {error, Reason};
{ok, NewConfig} -> {ok, NewConfig}
catch
@ -74,9 +74,9 @@ pre_config_update(_, UpdateReq, OldConfig) ->
{error, Reason}
end.
do_pre_config_update({create_authenticator, ChainName, Config}, OldConfig) ->
do_pre_config_update(_, {create_authenticator, ChainName, Config}, OldConfig) ->
NewId = authenticator_id(Config),
case lists:filter(fun(OldConfig0) -> authenticator_id(OldConfig0) =:= NewId end, OldConfig) of
case filter_authenticator(NewId, OldConfig) of
[] ->
CertsDir = certs_dir(ChainName, Config),
NConfig = convert_certs(CertsDir, Config),
@ -84,7 +84,7 @@ do_pre_config_update({create_authenticator, ChainName, Config}, OldConfig) ->
[_] ->
{error, {already_exists, {authenticator, NewId}}}
end;
do_pre_config_update({delete_authenticator, _ChainName, AuthenticatorID}, OldConfig) ->
do_pre_config_update(_, {delete_authenticator, _ChainName, AuthenticatorID}, OldConfig) ->
NewConfig = lists:filter(
fun(OldConfig0) ->
AuthenticatorID =/= authenticator_id(OldConfig0)
@ -92,7 +92,7 @@ do_pre_config_update({delete_authenticator, _ChainName, AuthenticatorID}, OldCon
OldConfig
),
{ok, NewConfig};
do_pre_config_update({update_authenticator, ChainName, AuthenticatorID, Config}, OldConfig) ->
do_pre_config_update(_, {update_authenticator, ChainName, AuthenticatorID, Config}, OldConfig) ->
CertsDir = certs_dir(ChainName, AuthenticatorID),
NewConfig = lists:map(
fun(OldConfig0) ->
@ -104,7 +104,7 @@ do_pre_config_update({update_authenticator, ChainName, AuthenticatorID, Config},
OldConfig
),
{ok, NewConfig};
do_pre_config_update({move_authenticator, _ChainName, AuthenticatorID, Position}, OldConfig) ->
do_pre_config_update(_, {move_authenticator, _ChainName, AuthenticatorID, Position}, OldConfig) ->
case split_by_id(AuthenticatorID, OldConfig) of
{error, Reason} ->
{error, Reason};
@ -129,7 +129,18 @@ do_pre_config_update({move_authenticator, _ChainName, AuthenticatorID, Position}
{ok, BeforeNFound ++ [FoundRelated, Found | AfterNFound]}
end
end
end.
end;
do_pre_config_update(_, OldConfig, OldConfig) ->
{ok, OldConfig};
do_pre_config_update(Paths, NewConfig, _OldConfig) ->
ChainName = chain_name(Paths),
{ok, [
begin
CertsDir = certs_dir(ChainName, New),
convert_certs(CertsDir, New)
end
|| New <- to_list(NewConfig)
]}.
-spec post_config_update(
list(atom()),
@ -139,13 +150,16 @@ do_pre_config_update({move_authenticator, _ChainName, AuthenticatorID, Position}
emqx_config:app_envs()
) ->
ok | {ok, map()} | {error, term()}.
post_config_update(_, UpdateReq, NewConfig, OldConfig, AppEnvs) ->
do_post_config_update(UpdateReq, to_list(NewConfig), OldConfig, AppEnvs).
post_config_update(Paths, UpdateReq, NewConfig, OldConfig, AppEnvs) ->
do_post_config_update(Paths, UpdateReq, to_list(NewConfig), OldConfig, AppEnvs).
do_post_config_update({create_authenticator, ChainName, Config}, NewConfig, _OldConfig, _AppEnvs) ->
do_post_config_update(
_, {create_authenticator, ChainName, Config}, NewConfig, _OldConfig, _AppEnvs
) ->
NConfig = get_authenticator_config(authenticator_id(Config), NewConfig),
emqx_authentication:create_authenticator(ChainName, NConfig);
do_post_config_update(
_,
{delete_authenticator, ChainName, AuthenticatorID},
_NewConfig,
OldConfig,
@ -160,6 +174,7 @@ do_post_config_update(
{error, Reason}
end;
do_post_config_update(
_,
{update_authenticator, ChainName, AuthenticatorID, Config},
NewConfig,
_OldConfig,
@ -172,12 +187,57 @@ do_post_config_update(
emqx_authentication:update_authenticator(ChainName, AuthenticatorID, NConfig)
end;
do_post_config_update(
_,
{move_authenticator, ChainName, AuthenticatorID, Position},
_NewConfig,
_OldConfig,
_AppEnvs
) ->
emqx_authentication:move_authenticator(ChainName, AuthenticatorID, Position).
emqx_authentication:move_authenticator(ChainName, AuthenticatorID, Position);
do_post_config_update(_, _UpdateReq, OldConfig, OldConfig, _AppEnvs) ->
ok;
do_post_config_update(Paths, _UpdateReq, NewConfig0, OldConfig0, _AppEnvs) ->
ChainName = chain_name(Paths),
OldConfig = to_list(OldConfig0),
NewConfig = to_list(NewConfig0),
OldIds = lists:map(fun authenticator_id/1, OldConfig),
NewIds = lists:map(fun authenticator_id/1, NewConfig),
ok = delete_authenticators(NewIds, ChainName, OldConfig),
ok = create_or_update_authenticators(OldIds, ChainName, NewConfig),
ok = emqx_authentication:reorder_authenticator(ChainName, NewIds),
ok.
%% create new authenticators and update existing ones
create_or_update_authenticators(OldIds, ChainName, NewConfig) ->
lists:foreach(
fun(Conf) ->
Id = authenticator_id(Conf),
case lists:member(Id, OldIds) of
true ->
emqx_authentication:update_authenticator(ChainName, Id, Conf);
false ->
emqx_authentication:create_authenticator(ChainName, Conf)
end
end,
NewConfig
).
%% delete authenticators that are not in the new config
delete_authenticators(NewIds, ChainName, OldConfig) ->
lists:foreach(
fun(Conf) ->
Id = authenticator_id(Conf),
case lists:member(Id, NewIds) of
true ->
ok;
false ->
_ = emqx_authentication:delete_authenticator(ChainName, Id),
CertsDir = certs_dir(ChainName, Conf),
ok = clear_certs(CertsDir, Conf)
end
end,
OldConfig
).
to_list(undefined) -> [];
to_list(M) when M =:= #{} -> [];
@ -213,14 +273,15 @@ clear_certs(CertsDir, Config) ->
ok = emqx_tls_lib:delete_ssl_files(CertsDir, undefined, OldSSL).
get_authenticator_config(AuthenticatorID, AuthenticatorsConfig) ->
case
lists:filter(fun(C) -> AuthenticatorID =:= authenticator_id(C) end, AuthenticatorsConfig)
of
case filter_authenticator(AuthenticatorID, AuthenticatorsConfig) of
[C] -> C;
[] -> {error, not_found};
_ -> error({duplicated_authenticator_id, AuthenticatorsConfig})
end.
filter_authenticator(ID, Authenticators) ->
lists:filter(fun(A) -> ID =:= authenticator_id(A) end, Authenticators).
split_by_id(ID, AuthenticatorsConfig) ->
case
lists:foldl(
@ -287,3 +348,8 @@ dir(ChainName, ID) when is_binary(ID) ->
emqx_utils:safe_filename(iolist_to_binary([to_bin(ChainName), "-", ID]));
dir(ChainName, Config) when is_map(Config) ->
dir(ChainName, authenticator_id(Config)).
chain_name([authentication]) ->
?GLOBAL;
chain_name([listeners, Type, Name, authentication]) ->
binary_to_existing_atom(<<(atom_to_binary(Type))/binary, ":", (atom_to_binary(Name))/binary>>).

View File

@ -176,11 +176,13 @@ insert_channel_info(ClientId, Info, Stats) ->
%% Note that: It should be called on a lock transaction
register_channel(ClientId, ChanPid, #{conn_mod := ConnMod}) when is_pid(ChanPid) ->
Chan = {ClientId, ChanPid},
%% cast (for process monitor) before inserting ets tables
cast({registered, Chan}),
true = ets:insert(?CHAN_TAB, Chan),
true = ets:insert(?CHAN_CONN_TAB, {Chan, ConnMod}),
ok = emqx_cm_registry:register_channel(Chan),
mark_channel_connected(ChanPid),
cast({registered, Chan}).
ok.
%% @doc Unregister a channel.
-spec unregister_channel(emqx_types:clientid()) -> ok.

View File

@ -280,25 +280,28 @@ get_default_value([RootName | _] = KeyPath) ->
end.
-spec get_raw(emqx_utils_maps:config_key_path()) -> term().
get_raw([Root | T]) when is_atom(Root) -> get_raw([bin(Root) | T]);
get_raw(KeyPath) -> do_get_raw(KeyPath).
get_raw([Root | _] = KeyPath) when is_binary(Root) -> do_get_raw(KeyPath);
get_raw([Root | T]) -> get_raw([bin(Root) | T]);
get_raw([]) -> do_get_raw([]).
-spec get_raw(emqx_utils_maps:config_key_path(), term()) -> term().
get_raw([Root | T], Default) when is_atom(Root) -> get_raw([bin(Root) | T], Default);
get_raw(KeyPath, Default) -> do_get_raw(KeyPath, Default).
get_raw([Root | _] = KeyPath, Default) when is_binary(Root) -> do_get_raw(KeyPath, Default);
get_raw([Root | T], Default) -> get_raw([bin(Root) | T], Default);
get_raw([], Default) -> do_get_raw([], Default).
-spec put_raw(map()) -> ok.
put_raw(Config) ->
maps:fold(
fun(RootName, RootV, _) ->
?MODULE:put_raw([RootName], RootV)
?MODULE:put_raw([bin(RootName)], RootV)
end,
ok,
hocon_maps:ensure_plain(Config)
).
-spec put_raw(emqx_utils_maps:config_key_path(), term()) -> ok.
put_raw(KeyPath, Config) ->
put_raw(KeyPath0, Config) ->
KeyPath = [bin(K) || K <- KeyPath0],
Putter = fun(Path, Map, Value) ->
emqx_utils_maps:deep_force_put(Path, Map, Value)
end,

View File

@ -31,8 +31,7 @@
remove_handler/1,
update_config/3,
get_raw_cluster_override_conf/0,
info/0,
merge_to_old_config/2
info/0
]).
%% gen_server callbacks
@ -332,7 +331,6 @@ do_post_config_update(
SubOldConf = get_sub_config(ConfKey, OldConf),
SubNewConf = get_sub_config(ConfKey, NewConf),
SubHandlers = get_sub_handlers(ConfKey, Handlers),
case
do_post_config_update(
SubConfKeyPath,
SubHandlers,
@ -342,21 +340,7 @@ do_post_config_update(
UpdateArgs,
Result,
ConfKeyPath
)
of
{ok, Result1} ->
call_post_config_update(
Handlers,
OldConf,
NewConf,
AppEnvs,
up_req(UpdateArgs),
Result1,
ConfKeyPath
);
Error ->
Error
end.
).
get_sub_handlers(ConfKey, Handlers) ->
case maps:find(ConfKey, Handlers) of

View File

@ -45,8 +45,8 @@ define(Term, _) ->
Term.
%% @doc Apply a function to a maybe argument.
-spec apply(fun((A) -> maybe(A)), maybe(A)) ->
maybe(A).
-spec apply(fun((A) -> B), maybe(A)) ->
maybe(B).
apply(_Fun, undefined) ->
undefined;
apply(Fun, Term) when is_function(Fun) ->

View File

@ -174,7 +174,7 @@ t_authenticator(Config) when is_list(Config) ->
register_provider(AuthNType1, ?MODULE),
ID1 = <<"password_based:built_in_database">>,
% CRUD of authencaticator
% CRUD of authenticator
?assertMatch(
{ok, #{id := ID1, state := #{mark := 1}}},
?AUTHN:create_authenticator(ChainName, AuthenticatorConfig1)
@ -296,7 +296,10 @@ t_update_config({init, Config}) ->
| Config
];
t_update_config(Config) when is_list(Config) ->
emqx_config_handler:add_handler([?CONF_ROOT], emqx_authentication),
emqx_config_handler:add_handler([?CONF_ROOT], emqx_authentication_config),
ok = emqx_config_handler:add_handler(
[listeners, '?', '?', ?CONF_ROOT], emqx_authentication_config
),
ok = register_provider(?config("auth1"), ?MODULE),
ok = register_provider(?config("auth2"), ?MODULE),
Global = ?config(global),
@ -355,6 +358,10 @@ t_update_config(Config) when is_list(Config) ->
?assertMatch({ok, [#{id := ID2}, #{id := ID1}]}, ?AUTHN:list_authenticators(Global)),
[Raw2, Raw1] = emqx:get_raw_config([?CONF_ROOT]),
?assertMatch({ok, _}, update_config([?CONF_ROOT], [Raw1, Raw2])),
?assertMatch({ok, [#{id := ID1}, #{id := ID2}]}, ?AUTHN:list_authenticators(Global)),
?assertMatch({ok, _}, update_config([?CONF_ROOT], {delete_authenticator, Global, ID1})),
?assertEqual(
{error, {not_found, {authenticator, ID1}}},
@ -417,11 +424,16 @@ t_update_config(Config) when is_list(Config) ->
{ok, _},
update_config(ConfKeyPath, {move_authenticator, ListenerID, ID2, ?CMD_MOVE_FRONT})
),
?assertMatch(
{ok, [#{id := ID2}, #{id := ID1}]},
?AUTHN:list_authenticators(ListenerID)
),
[LRaw2, LRaw1] = emqx:get_raw_config(ConfKeyPath),
?assertMatch({ok, _}, update_config(ConfKeyPath, [LRaw1, LRaw2])),
?assertMatch(
{ok, [#{id := ID1}, #{id := ID2}]},
?AUTHN:list_authenticators(ListenerID)
),
?assertMatch(
{ok, _},

View File

@ -277,7 +277,7 @@ wait_for_app_processes(_) ->
%% and stop others, and then the `application:start/2' callback is
%% never called again for this application.
perform_sanity_checks(emqx_rule_engine) ->
ensure_config_handler(emqx_rule_engine, [rule_engine, rules]),
ensure_config_handler(emqx_rule_engine, [rule_engine, rules, '?']),
ok;
perform_sanity_checks(emqx_bridge) ->
ensure_config_handler(emqx_bridge, [bridges]),
@ -289,7 +289,7 @@ ensure_config_handler(Module, ConfigPath) ->
#{handlers := Handlers} = sys:get_state(emqx_config_handler),
case emqx_utils_maps:deep_get(ConfigPath, Handlers, not_found) of
#{{mod} := Module} -> ok;
_NotFound -> error({config_handler_missing, ConfigPath, Module})
NotFound -> error({config_handler_missing, ConfigPath, Module, NotFound})
end,
ok.

View File

@ -130,7 +130,7 @@ t_root_key_update(_Config) ->
?assertEqual(
{ok, #{
config => 0.81,
post_config_update => #{?MODULE => ok},
post_config_update => #{},
raw_config => <<"81%">>
}},
emqx:update_config(SubKey, "81%", Opts)
@ -174,7 +174,7 @@ t_sub_key_update_remove(_Config) ->
%% remove
?assertEqual(
{ok, #{post_config_update => #{emqx_config_handler_SUITE => ok}}},
{ok, #{post_config_update => #{?MODULE => ok}}},
emqx:remove_config(KeyPath)
),
?assertError(
@ -184,18 +184,6 @@ t_sub_key_update_remove(_Config) ->
?assertEqual(false, lists:member(<<"cpu_check_interval">>, OSKey)),
?assert(length(OSKey) > 0),
?assertEqual(
{ok, #{
config => 60000,
post_config_update => #{?MODULE => ok},
raw_config => <<"60s">>
}},
emqx:reset_config(KeyPath, Opts)
),
OSKey1 = maps:keys(emqx:get_raw_config([sysmon, os])),
?assertEqual(true, lists:member(<<"cpu_check_interval">>, OSKey1)),
?assert(length(OSKey1) > 1),
ok = emqx_config_handler:remove_handler(KeyPath),
ok = emqx_config_handler:remove_handler(KeyPath2),
ok.
@ -292,44 +280,6 @@ t_get_raw_cluster_override_conf(_Config) ->
?assertEqual(OldInfo, NewInfo),
ok.
t_save_config_failed(_Config) ->
ok.
t_update_sub(_Config) ->
PathKey = [sysmon],
Opts = #{rawconf_with_defaults => true},
ok = emqx_config_handler:add_handler(PathKey, ?MODULE),
%% update sub key
#{<<"os">> := OS1} = emqx:get_raw_config(PathKey),
{ok, Res} = emqx:update_config(PathKey ++ [os, cpu_check_interval], <<"120s">>, Opts),
?assertMatch(
#{
config := 120000,
post_config_update := #{?MODULE := ok},
raw_config := <<"120s">>
},
Res
),
?assertMatch(#{os := #{cpu_check_interval := 120000}}, emqx:get_config(PathKey)),
#{<<"os">> := OS2} = emqx:get_raw_config(PathKey),
?assertEqual(lists:sort(maps:keys(OS1)), lists:sort(maps:keys(OS2))),
%% update sub key
SubKey = PathKey ++ [os, cpu_high_watermark],
?assertEqual(
{ok, #{
config => 0.81,
post_config_update => #{?MODULE => ok},
raw_config => <<"81%">>
}},
emqx:update_config(SubKey, "81%", Opts)
),
?assertEqual(0.81, emqx:get_config(SubKey)),
?assertEqual("81%", emqx:get_raw_config(SubKey)),
ok = emqx_config_handler:remove_handler(PathKey),
ok.
pre_config_update([sysmon], UpdateReq, _RawConf) ->
{ok, UpdateReq};
pre_config_update([sysmon, os], UpdateReq, _RawConf) ->

View File

@ -279,7 +279,7 @@ does_module_exist(Mod) ->
clear_listeners() ->
emqx_config:put([listeners], #{}),
emqx_config:put_raw([listeners], #{}),
emqx_config:put_raw([<<"listeners">>], #{}),
ok.
assert_http_get(URL) ->

View File

@ -23,8 +23,6 @@
-define(AUTHN, emqx_authentication).
-define(GLOBAL, 'mqtt:global').
-define(RE_PLACEHOLDER, "\\$\\{[a-z0-9\\-]+\\}").
-define(AUTH_SHARD, emqx_authn_shard).

View File

@ -31,6 +31,7 @@
-define(NOT_FOUND, 'NOT_FOUND').
-define(ALREADY_EXISTS, 'ALREADY_EXISTS').
-define(INTERNAL_ERROR, 'INTERNAL_ERROR').
-define(CONFIG, emqx_authentication_config).
% Swagger
@ -833,12 +834,12 @@ with_chain(ListenerID, Fun) ->
create_authenticator(ConfKeyPath, ChainName, Config) ->
case update_config(ConfKeyPath, {create_authenticator, ChainName, Config}) of
{ok, #{
post_config_update := #{emqx_authentication := #{id := ID}},
post_config_update := #{?CONFIG := #{id := ID}},
raw_config := AuthenticatorsConfig
}} ->
{ok, AuthenticatorConfig} = find_config(ID, AuthenticatorsConfig),
{200, maps:put(id, ID, convert_certs(fill_defaults(AuthenticatorConfig)))};
{error, {_PrePostConfigUpdate, emqx_authentication, Reason}} ->
{error, {_PrePostConfigUpdate, ?CONFIG, Reason}} ->
serialize_error(Reason);
{error, Reason} ->
serialize_error(Reason)
@ -1017,7 +1018,7 @@ update_authenticator(ConfKeyPath, ChainName, AuthenticatorID, Config) ->
of
{ok, _} ->
{204};
{error, {_PrePostConfigUpdate, emqx_authentication, Reason}} ->
{error, {_PrePostConfigUpdate, ?CONFIG, Reason}} ->
serialize_error(Reason);
{error, Reason} ->
serialize_error(Reason)
@ -1027,7 +1028,7 @@ delete_authenticator(ConfKeyPath, ChainName, AuthenticatorID) ->
case update_config(ConfKeyPath, {delete_authenticator, ChainName, AuthenticatorID}) of
{ok, _} ->
{204};
{error, {_PrePostConfigUpdate, emqx_authentication, Reason}} ->
{error, {_PrePostConfigUpdate, ?CONFIG, Reason}} ->
serialize_error(Reason);
{error, Reason} ->
serialize_error(Reason)
@ -1044,7 +1045,7 @@ move_authenticator(ConfKeyPath, ChainName, AuthenticatorID, Position) ->
of
{ok, _} ->
{204};
{error, {_PrePostConfigUpdate, emqx_authentication, Reason}} ->
{error, {_PrePostConfigUpdate, ?CONFIG, Reason}} ->
serialize_error(Reason);
{error, Reason} ->
serialize_error(Reason)

View File

@ -200,6 +200,127 @@ t_union_selector_errors(Config) when is_list(Config) ->
),
ok.
t_update_conf({init, Config}) ->
emqx_common_test_helpers:start_apps([emqx_conf, emqx_authn]),
{ok, _} = emqx:update_config([authentication], []),
Config;
t_update_conf({'end', _Config}) ->
{ok, _} = emqx:update_config([authentication], []),
emqx_common_test_helpers:stop_apps([emqx_authn, emqx_conf]),
ok;
t_update_conf(Config) when is_list(Config) ->
Authn1 = #{
<<"mechanism">> => <<"password_based">>,
<<"backend">> => <<"built_in_database">>,
<<"user_id_type">> => <<"clientid">>,
<<"enable">> => true
},
Authn2 = #{
<<"mechanism">> => <<"password_based">>,
<<"backend">> => <<"http">>,
<<"method">> => <<"post">>,
<<"url">> => <<"http://127.0.0.1:18083">>,
<<"headers">> => #{
<<"content-type">> => <<"application/json">>
},
<<"enable">> => true
},
Authn3 = #{
<<"mechanism">> => <<"jwt">>,
<<"use_jwks">> => false,
<<"algorithm">> => <<"hmac-based">>,
<<"secret">> => <<"mysecret">>,
<<"secret_base64_encoded">> => false,
<<"verify_claims">> => #{<<"username">> => <<"${username}">>},
<<"enable">> => true
},
Chain = 'mqtt:global',
{ok, _} = emqx:update_config([authentication], [Authn1]),
?assertMatch(
{ok, #{
authenticators := [
#{
enable := true,
id := <<"password_based:built_in_database">>,
provider := emqx_authn_mnesia
}
]
}},
emqx_authentication:lookup_chain(Chain)
),
{ok, _} = emqx:update_config([authentication], [Authn1, Authn2, Authn3]),
?assertMatch(
{ok, #{
authenticators := [
#{
enable := true,
id := <<"password_based:built_in_database">>,
provider := emqx_authn_mnesia
},
#{
enable := true,
id := <<"password_based:http">>,
provider := emqx_authn_http
},
#{
enable := true,
id := <<"jwt">>,
provider := emqx_authn_jwt
}
]
}},
emqx_authentication:lookup_chain(Chain)
),
{ok, _} = emqx:update_config([authentication], [Authn2, Authn1]),
?assertMatch(
{ok, #{
authenticators := [
#{
enable := true,
id := <<"password_based:http">>,
provider := emqx_authn_http
},
#{
enable := true,
id := <<"password_based:built_in_database">>,
provider := emqx_authn_mnesia
}
]
}},
emqx_authentication:lookup_chain(Chain)
),
{ok, _} = emqx:update_config([authentication], [Authn3, Authn2, Authn1]),
?assertMatch(
{ok, #{
authenticators := [
#{
enable := true,
id := <<"jwt">>,
provider := emqx_authn_jwt
},
#{
enable := true,
id := <<"password_based:http">>,
provider := emqx_authn_http
},
#{
enable := true,
id := <<"password_based:built_in_database">>,
provider := emqx_authn_mnesia
}
]
}},
emqx_authentication:lookup_chain(Chain)
),
{ok, _} = emqx:update_config([authentication], []),
?assertMatch(
{error, {not_found, {chain, Chain}}},
emqx_authentication:lookup_chain(Chain)
),
ok.
parse(Bytes) ->
{ok, Frame, <<>>, {none, _}} = emqx_frame:parse(Bytes),
Frame.

View File

@ -43,6 +43,7 @@
-define(CMD_MOVE_BEFORE(Before), {before, Before}).
-define(CMD_MOVE_AFTER(After), {'after', After}).
-define(ROOT_KEY, [authorization]).
-define(CONF_KEY_PATH, [authorization, sources]).
-define(RE_PLACEHOLDER, "\\$\\{[a-z0-9_]+\\}").

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_authz, [
{description, "An OTP application"},
{vsn, "0.1.21"},
{vsn, "0.1.22"},
{registered, []},
{mod, {emqx_authz_app, []}},
{applications, [

View File

@ -101,6 +101,7 @@ init() ->
ok = register_metrics(),
ok = init_metrics(client_info_source()),
emqx_conf:add_handler(?CONF_KEY_PATH, ?MODULE),
emqx_conf:add_handler(?ROOT_KEY, ?MODULE),
Sources = emqx_conf:get(?CONF_KEY_PATH, []),
ok = check_dup_types(Sources),
NSources = create_sources(Sources),
@ -109,6 +110,7 @@ init() ->
deinit() ->
ok = emqx_hooks:del('client.authorize', {?MODULE, authorize}),
emqx_conf:remove_handler(?CONF_KEY_PATH),
emqx_conf:remove_handler(?ROOT_KEY),
emqx_authz_utils:cleanup_resources().
lookup() ->
@ -139,14 +141,29 @@ update({?CMD_DELETE, Type}, Sources) ->
update(Cmd, Sources) ->
emqx_authz_utils:update_config(?CONF_KEY_PATH, {Cmd, Sources}).
pre_config_update(_, Cmd, Sources) ->
try do_pre_config_update(Cmd, Sources) of
pre_config_update(Path, Cmd, Sources) ->
try do_pre_config_update(Path, Cmd, Sources) of
{error, Reason} -> {error, Reason};
NSources -> {ok, NSources}
catch
_:Reason -> {error, Reason}
end.
do_pre_config_update(?CONF_KEY_PATH, Cmd, Sources) ->
do_pre_config_update(Cmd, Sources);
do_pre_config_update(?ROOT_KEY, NewConf, OldConf) ->
do_pre_config_replace(NewConf, OldConf).
%% override the entire config when updating the root key
%% emqx_conf:update(?ROOT_KEY, Conf);
do_pre_config_replace(Conf, Conf) ->
Conf;
do_pre_config_replace(NewConf, OldConf) ->
#{<<"sources">> := NewSources} = NewConf,
#{<<"sources">> := OldSources} = OldConf,
NewSources1 = do_pre_config_update({?CMD_REPLACE, NewSources}, OldSources),
NewConf#{<<"sources">> := NewSources1}.
do_pre_config_update({?CMD_MOVE, _, _} = Cmd, Sources) ->
do_move(Cmd, Sources);
do_pre_config_update({?CMD_PREPEND, Source}, Sources) ->
@ -179,47 +196,53 @@ do_pre_config_update({Op, Source}, Sources) ->
post_config_update(_, _, undefined, _OldSource, _AppEnvs) ->
ok;
post_config_update(_, Cmd, NewSources, _OldSource, _AppEnvs) ->
Actions = do_post_config_update(Cmd, NewSources),
post_config_update(Path, Cmd, NewSources, _OldSource, _AppEnvs) ->
Actions = do_post_config_update(Path, Cmd, NewSources),
ok = update_authz_chain(Actions),
ok = emqx_authz_cache:drain_cache().
do_post_config_update({?CMD_MOVE, _Type, _Where} = Cmd, _Sources) ->
InitedSources = lookup(),
do_move(Cmd, InitedSources);
do_post_config_update({?CMD_PREPEND, RawNewSource}, Sources) ->
InitedNewSource = create_source(get_source_by_type(type(RawNewSource), Sources)),
%% create metrics
do_post_config_update(?CONF_KEY_PATH, {?CMD_MOVE, _Type, _Where} = Cmd, _Sources) ->
do_move(Cmd, lookup());
do_post_config_update(?CONF_KEY_PATH, {?CMD_PREPEND, RawNewSource}, Sources) ->
TypeName = type(RawNewSource),
ok = emqx_metrics_worker:create_metrics(
authz_metrics,
TypeName,
[total, allow, deny, nomatch],
[total]
),
[InitedNewSource] ++ lookup();
do_post_config_update({?CMD_APPEND, RawNewSource}, Sources) ->
InitedNewSource = create_source(get_source_by_type(type(RawNewSource), Sources)),
lookup() ++ [InitedNewSource];
do_post_config_update({{?CMD_REPLACE, Type}, RawNewSource}, Sources) ->
NewSources = create_sources([get_source_by_type(TypeName, Sources)]),
NewSources ++ lookup();
do_post_config_update(?CONF_KEY_PATH, {?CMD_APPEND, RawNewSource}, Sources) ->
NewSources = create_sources([get_source_by_type(type(RawNewSource), Sources)]),
lookup() ++ NewSources;
do_post_config_update(?CONF_KEY_PATH, {{?CMD_REPLACE, Type}, RawNewSource}, Sources) ->
OldSources = lookup(),
{OldSource, Front, Rear} = take(Type, OldSources),
NewSource = get_source_by_type(type(RawNewSource), Sources),
InitedSources = update_source(type(RawNewSource), OldSource, NewSource),
Front ++ [InitedSources] ++ Rear;
do_post_config_update({{?CMD_DELETE, Type}, _RawNewSource}, _Sources) ->
do_post_config_update(?CONF_KEY_PATH, {{?CMD_DELETE, Type}, _RawNewSource}, _Sources) ->
OldInitedSources = lookup(),
{OldSource, Front, Rear} = take(Type, OldInitedSources),
%% delete metrics
ok = emqx_metrics_worker:clear_metrics(authz_metrics, Type),
ok = ensure_resource_deleted(OldSource),
clear_certs(OldSource),
ok = ensure_deleted(OldSource, #{clear_metric => true}),
Front ++ Rear;
do_post_config_update({?CMD_REPLACE, _RawNewSources}, Sources) ->
%% overwrite the entire config!
OldInitedSources = lookup(),
lists:foreach(fun ensure_resource_deleted/1, OldInitedSources),
lists:foreach(fun clear_certs/1, OldInitedSources),
do_post_config_update(?CONF_KEY_PATH, {?CMD_REPLACE, _RawNewSources}, Sources) ->
overwrite_entire_sources(Sources);
do_post_config_update(?ROOT_KEY, Conf, Conf) ->
#{sources := Sources} = Conf,
Sources;
do_post_config_update(?ROOT_KEY, _Conf, NewConf) ->
#{sources := NewSources} = NewConf,
overwrite_entire_sources(NewSources).
overwrite_entire_sources(Sources) ->
PrevSources = lookup(),
NewSourcesTypes = lists:map(fun type/1, Sources),
EnsureDelete = fun(S) ->
TypeName = type(S),
Opts =
case lists:member(TypeName, NewSourcesTypes) of
true -> #{clear_metric => false};
false -> #{clear_metric => true}
end,
ensure_deleted(S, Opts)
end,
lists:foreach(EnsureDelete, PrevSources),
create_sources(Sources).
%% @doc do source move
@ -238,8 +261,14 @@ do_move({?CMD_MOVE, Type, ?CMD_MOVE_AFTER(After)}, Sources) ->
{S2, Front2, Rear2} = take(After, Front1 ++ Rear1),
Front2 ++ [S2, S1] ++ Rear2.
ensure_resource_deleted(#{enable := false}) ->
ensure_deleted(#{enable := false}, _) ->
ok;
ensure_deleted(Source, #{clear_metric := ClearMetric}) ->
TypeName = type(Source),
ensure_resource_deleted(Source),
clear_certs(Source),
ClearMetric andalso emqx_metrics_worker:clear_metrics(authz_metrics, TypeName).
ensure_resource_deleted(#{type := Type} = Source) ->
Module = authz_module(Type),
Module:destroy(Source).
@ -287,12 +316,18 @@ update_source(Type, OldSource, NewSource) ->
init_metrics(Source) ->
TypeName = type(Source),
case emqx_metrics_worker:has_metrics(authz_metrics, TypeName) of
%% Don't reset the metrics if it already exists
true ->
ok;
false ->
emqx_metrics_worker:create_metrics(
authz_metrics,
TypeName,
[total, allow, deny, nomatch],
[total]
).
)
end.
%%--------------------------------------------------------------------
%% AuthZ callbacks
@ -487,7 +522,9 @@ write_acl_file(#{<<"rules">> := Rules} = Source0) ->
ok = check_acl_file_rules(AclPath, Rules),
ok = write_file(AclPath, Rules),
Source1 = maps:remove(<<"rules">>, Source0),
maps:put(<<"path">>, AclPath, Source1).
maps:put(<<"path">>, AclPath, Source1);
write_acl_file(Source) ->
Source.
%% @doc where the acl.conf file is stored.
acl_conf_file() ->

View File

@ -272,9 +272,80 @@ t_update_source(_) ->
],
emqx_conf:get([authorization, sources], [])
),
?assertMatch(
[
#{type := http, enable := false},
#{type := mongodb, enable := false},
#{type := mysql, enable := false},
#{type := postgresql, enable := false},
#{type := redis, enable := false},
#{type := file, enable := false}
],
emqx_authz:lookup()
),
{ok, _} = emqx_authz:update(?CMD_REPLACE, []).
t_replace_all(_) ->
RootKey = [<<"authorization">>],
Conf = emqx:get_raw_config(RootKey),
emqx_authz_utils:update_config(RootKey, Conf#{
<<"sources">> => [
?SOURCE6, ?SOURCE5, ?SOURCE4, ?SOURCE3, ?SOURCE2, ?SOURCE1
]
}),
%% config
?assertMatch(
[
#{type := file, enable := true},
#{type := redis, enable := true},
#{type := postgresql, enable := true},
#{type := mysql, enable := true},
#{type := mongodb, enable := true},
#{type := http, enable := true}
],
emqx_conf:get([authorization, sources], [])
),
%% hooks status
?assertMatch(
[
#{type := file, enable := true},
#{type := redis, enable := true},
#{type := postgresql, enable := true},
#{type := mysql, enable := true},
#{type := mongodb, enable := true},
#{type := http, enable := true}
],
emqx_authz:lookup()
),
Ids = [http, mongodb, mysql, postgresql, redis, file],
%% metrics
lists:foreach(
fun(Id) ->
?assert(emqx_metrics_worker:has_metrics(authz_metrics, Id), Id)
end,
Ids
),
?assertMatch(
{ok, _},
emqx_authz_utils:update_config(
RootKey,
Conf#{<<"sources">> => [?SOURCE1#{<<"enable">> => false}]}
)
),
%% hooks status
?assertMatch([#{type := http, enable := false}], emqx_authz:lookup()),
%% metrics
?assert(emqx_metrics_worker:has_metrics(authz_metrics, http)),
lists:foreach(
fun(Id) ->
?assertNot(emqx_metrics_worker:has_metrics(authz_metrics, Id), Id)
end,
Ids -- [http]
),
ok.
t_delete_source(_) ->
{ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE1]),

View File

@ -25,21 +25,26 @@
-define(DEFAULT_CHECK_AVAIL_TIMEOUT, 1000).
reset_authorizers() ->
reset_authorizers(deny, false).
reset_authorizers(deny, false, []).
restore_authorizers() ->
reset_authorizers(allow, true).
reset_authorizers(allow, true, []).
reset_authorizers(Nomatch, ChacheEnabled) ->
reset_authorizers(Nomatch, CacheEnabled, Source) ->
{ok, _} = emqx:update_config(
[authorization],
#{
<<"no_match">> => atom_to_binary(Nomatch),
<<"cache">> => #{<<"enable">> => atom_to_binary(ChacheEnabled)},
<<"sources">> => []
<<"cache">> => #{<<"enable">> => atom_to_binary(CacheEnabled)},
<<"sources">> => Source
}
),
ok.
%% Don't reset sources
reset_authorizers(Nomatch, CacheEnabled) ->
{ok, _} = emqx:update_config([<<"authorization">>, <<"no_match">>], Nomatch),
{ok, _} = emqx:update_config([<<"authorization">>, <<"cache">>, <<"enable">>], CacheEnabled),
ok.
setup_config(BaseConfig, SpecialParams) ->
Config = maps:merge(BaseConfig, SpecialParams),

View File

@ -39,7 +39,8 @@
disable_enable/3,
remove/2,
check_deps_and_remove/3,
list/0
list/0,
reload_hook/1
]).
-export([
@ -133,6 +134,10 @@ safe_load_bridge(Type, Name, Conf, Opts) ->
})
end.
reload_hook(Bridges) ->
ok = unload_hook(),
ok = load_hook(Bridges).
load_hook() ->
Bridges = emqx:get_config([bridges], #{}),
load_hook(Bridges).
@ -216,9 +221,9 @@ send_message(BridgeType, BridgeName, ResId, Message) ->
end.
query_opts(Config) ->
case emqx_utils_maps:deep_get([resource_opts, request_timeout], Config, false) of
case emqx_utils_maps:deep_get([resource_opts, request_ttl], Config, false) of
Timeout when is_integer(Timeout) orelse Timeout =:= infinity ->
%% request_timeout is configured
%% request_ttl is configured
#{timeout => Timeout};
_ ->
%% emqx_resource has a default value (15s)

View File

@ -218,7 +218,6 @@ info_example_basic(webhook) ->
resource_opts => #{
worker_pool_size => 1,
health_check_interval => 15000,
auto_restart_interval => 15000,
query_mode => async,
inflight_window => 100,
max_buffer_bytes => 100 * 1024 * 1024
@ -233,7 +232,6 @@ info_example_basic(mqtt) ->
mqtt_main_example() ->
#{
enable => true,
mode => cluster_shareload,
server => <<"127.0.0.1:1883">>,
proto_ver => <<"v4">>,
username => <<"foo">>,
@ -244,7 +242,6 @@ mqtt_main_example() ->
max_inflight => 100,
resource_opts => #{
health_check_interval => <<"15s">>,
auto_restart_interval => <<"60s">>,
query_mode => sync,
max_buffer_bytes => 100 * 1024 * 1024
},

View File

@ -69,10 +69,32 @@ pre_config_update(Path, Conf, _OldConfig) when is_map(Conf) ->
{ok, ConfNew}
end.
post_config_update(Path, '$remove', _, OldConf, _AppEnvs) ->
_ = emqx_connector_ssl:clear_certs(filename:join(Path), OldConf);
post_config_update(Path, _Req, NewConf, OldConf, _AppEnvs) ->
post_config_update([bridges, BridgeType, BridgeName] = Path, '$remove', _, OldConf, _AppEnvs) ->
_ = emqx_connector_ssl:clear_certs(filename:join(Path), OldConf),
ok = emqx_bridge_resource:remove(BridgeType, BridgeName),
Bridges = emqx_utils_maps:deep_remove([BridgeType, BridgeName], emqx:get_config([bridges])),
emqx_bridge:reload_hook(Bridges),
?tp(bridge_post_config_update_done, #{}),
ok;
post_config_update([bridges, BridgeType, BridgeName] = Path, _Req, NewConf, undefined, _AppEnvs) ->
_ = emqx_connector_ssl:try_clear_certs(filename:join(Path), NewConf, undefined),
ResOpts = emqx_resource:fetch_creation_opts(NewConf),
ok = emqx_bridge_resource:create(BridgeType, BridgeName, NewConf, ResOpts),
Bridges = emqx_utils_maps:deep_put(
[BridgeType, BridgeName], emqx:get_config([bridges]), NewConf
),
emqx_bridge:reload_hook(Bridges),
?tp(bridge_post_config_update_done, #{}),
ok;
post_config_update([bridges, BridgeType, BridgeName] = Path, _Req, NewConf, OldConf, _AppEnvs) ->
_ = emqx_connector_ssl:try_clear_certs(filename:join(Path), NewConf, OldConf),
ResOpts = emqx_resource:fetch_creation_opts(NewConf),
ok = emqx_bridge_resource:update(BridgeType, BridgeName, {OldConf, NewConf}, ResOpts),
Bridges = emqx_utils_maps:deep_put(
[BridgeType, BridgeName], emqx:get_config([bridges]), NewConf
),
emqx_bridge:reload_hook(Bridges),
?tp(bridge_post_config_update_done, #{}),
ok.
%% internal functions

View File

@ -327,8 +327,8 @@ parse_confs(
Reason1 = emqx_utils:readable_error_msg(Reason),
invalid_data(<<"Invalid URL: ", Url1/binary, ", details: ", Reason1/binary>>)
end,
RequestTimeout = emqx_utils_maps:deep_get(
[resource_opts, request_timeout],
RequestTTL = emqx_utils_maps:deep_get(
[resource_opts, request_ttl],
Conf
),
Conf#{
@ -339,7 +339,7 @@ parse_confs(
method => Method,
body => maps:get(body, Conf, undefined),
headers => Headers,
request_timeout => RequestTimeout,
request_ttl => RequestTTL,
max_retries => Retry
}
};

View File

@ -68,7 +68,6 @@ up(#{<<"connector">> := Connector} = Config) ->
Cn(password, <<>>),
Cn(clean_start, true),
Cn(keepalive, <<"60s">>),
Cn(mode, <<"cluster_shareload">>),
Cn(proto_ver, <<"v4">>),
Cn(server, undefined),
Cn(retry_interval, <<"15s">>),
@ -87,7 +86,6 @@ default_ssl() ->
default_resource_opts() ->
#{
<<"inflight_window">> => 100,
<<"auto_restart_interval">> => <<"60s">>,
<<"health_check_interval">> => <<"15s">>,
<<"max_buffer_bytes">> => <<"1GB">>,
<<"query_mode">> => <<"sync">>,

View File

@ -160,7 +160,6 @@ t_update_ssl_conf(Config) ->
<<"bridge_mode">> => false,
<<"clean_start">> => true,
<<"keepalive">> => <<"60s">>,
<<"mode">> => <<"cluster_shareload">>,
<<"proto_ver">> => <<"v4">>,
<<"server">> => <<"127.0.0.1:1883">>,
<<"ssl">> =>

View File

@ -86,8 +86,7 @@ groups() ->
SingleOnlyTests = [
t_broken_bpapi_vsn,
t_old_bpapi_vsn,
t_bridges_probe,
t_auto_restart_interval
t_bridges_probe
],
ClusterLaterJoinOnlyTCs = [t_cluster_later_join_metrics],
[
@ -559,89 +558,6 @@ t_http_crud_apis(Config) ->
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config).
t_auto_restart_interval(Config) ->
Port = ?config(port, Config),
%% assert we there's no bridges at first
{ok, 200, []} = request_json(get, uri(["bridges"]), Config),
meck:new(emqx_resource, [passthrough]),
meck:expect(emqx_resource, call_start, fun(_, _, _) -> {error, fake_error} end),
%% then we add a webhook bridge, using POST
%% POST /bridges/ will create a bridge
URL1 = ?URL(Port, "path1"),
Name = ?BRIDGE_NAME,
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
BridgeParams = ?HTTP_BRIDGE(URL1, Name)#{
<<"resource_opts">> => #{<<"auto_restart_interval">> => "1s"}
},
?check_trace(
begin
?assertMatch(
{ok, 201, #{
<<"type">> := ?BRIDGE_TYPE_HTTP,
<<"name">> := Name,
<<"enable">> := true,
<<"status">> := _,
<<"node_status">> := [_ | _],
<<"url">> := URL1
}},
request_json(
post,
uri(["bridges"]),
BridgeParams,
Config
)
),
{ok, _} = ?block_until(#{?snk_kind := resource_disconnected_enter}),
{ok, _} = ?block_until(#{?snk_kind := resource_auto_reconnect}, 1500)
end,
fun(Trace0) ->
Trace = ?of_kind(resource_auto_reconnect, Trace0),
?assertMatch([#{}], Trace),
ok
end
),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config),
{ok, 200, []} = request_json(get, uri(["bridges"]), Config),
%% auto_retry_interval=infinity
BridgeParams1 = BridgeParams#{
<<"resource_opts">> => #{<<"auto_restart_interval">> => "infinity"}
},
?check_trace(
begin
?assertMatch(
{ok, 201, #{
<<"type">> := ?BRIDGE_TYPE_HTTP,
<<"name">> := Name,
<<"enable">> := true,
<<"status">> := _,
<<"node_status">> := [_ | _],
<<"url">> := URL1
}},
request_json(
post,
uri(["bridges"]),
BridgeParams1,
Config
)
),
{ok, _} = ?block_until(#{?snk_kind := resource_disconnected_enter}),
?assertEqual(timeout, ?block_until(#{?snk_kind := resource_auto_reconnect}, 1500))
end,
fun(Trace0) ->
Trace = ?of_kind(resource_auto_reconnect, Trace0),
?assertMatch([], Trace),
ok
end
),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config),
{ok, 200, []} = request_json(get, uri(["bridges"]), Config),
meck:unload(emqx_resource).
t_http_bridges_local_topic(Config) ->
Port = ?config(port, Config),
%% assert we there's no bridges at first
@ -1384,7 +1300,7 @@ t_metrics(Config) ->
),
ok.
%% request_timeout in bridge root should match request_timeout in
%% request_timeout in bridge root should match request_ttl in
%% resource_opts.
t_inconsistent_webhook_request_timeouts(Config) ->
Port = ?config(port, Config),
@ -1395,7 +1311,7 @@ t_inconsistent_webhook_request_timeouts(Config) ->
?HTTP_BRIDGE(URL1, Name),
#{
<<"request_timeout">> => <<"1s">>,
<<"resource_opts">> => #{<<"request_timeout">> => <<"2s">>}
<<"resource_opts">> => #{<<"request_ttl">> => <<"2s">>}
}
),
%% root request_timeout is deprecated for bridge.
@ -1410,8 +1326,8 @@ t_inconsistent_webhook_request_timeouts(Config) ->
Config
),
?assertNot(maps:is_key(<<"request_timeout">>, Response)),
?assertMatch(#{<<"request_timeout">> := <<"2s">>}, ResourceOpts),
validate_resource_request_timeout(proplists:get_value(group, Config), 2000, Name),
?assertMatch(#{<<"request_ttl">> := <<"2s">>}, ResourceOpts),
validate_resource_request_ttl(proplists:get_value(group, Config), 2000, Name),
ok.
t_cluster_later_join_metrics(Config) ->
@ -1452,7 +1368,7 @@ t_cluster_later_join_metrics(Config) ->
),
ok.
validate_resource_request_timeout(single, Timeout, Name) ->
validate_resource_request_ttl(single, Timeout, Name) ->
SentData = #{payload => <<"Hello EMQX">>, timestamp => 1668602148000},
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
ResId = emqx_bridge_resource:resource_id(<<"webhook">>, Name),
@ -1472,7 +1388,7 @@ validate_resource_request_timeout(single, Timeout, Name) ->
ok
end
);
validate_resource_request_timeout(_Cluster, _Timeout, _Name) ->
validate_resource_request_ttl(_Cluster, _Timeout, _Name) ->
ignore.
%%

View File

@ -71,7 +71,7 @@ webhook_config_test() ->
}
}
} = check(Conf3),
?assertMatch(#{<<"request_timeout">> := infinity}, ResourceOpts),
?assertMatch(#{<<"request_ttl">> := infinity}, ResourceOpts),
ok.
up(#{<<"bridges">> := Bridges0} = Conf0) ->

View File

@ -167,7 +167,7 @@ bridge_async_config(#{port := Port} = Config) ->
ConnectTimeout = maps:get(connect_timeout, Config, 1),
RequestTimeout = maps:get(request_timeout, Config, 10000),
ResumeInterval = maps:get(resume_interval, Config, "1s"),
ResourceRequestTimeout = maps:get(resource_request_timeout, Config, "infinity"),
ResourceRequestTTL = maps:get(resource_request_ttl, Config, "infinity"),
ConfigString = io_lib:format(
"bridges.~s.~s {\n"
" url = \"http://localhost:~p\"\n"
@ -182,11 +182,10 @@ bridge_async_config(#{port := Port} = Config) ->
" body = \"${id}\""
" resource_opts {\n"
" inflight_window = 100\n"
" auto_restart_interval = \"60s\"\n"
" health_check_interval = \"15s\"\n"
" max_buffer_bytes = \"1GB\"\n"
" query_mode = \"~s\"\n"
" request_timeout = \"~p\"\n"
" request_ttl = \"~p\"\n"
" resume_interval = \"~s\"\n"
" start_after_created = \"true\"\n"
" start_timeout = \"5s\"\n"
@ -204,7 +203,7 @@ bridge_async_config(#{port := Port} = Config) ->
PoolSize,
RequestTimeout,
QueryMode,
ResourceRequestTimeout,
ResourceRequestTTL,
ResumeInterval
]
),
@ -247,7 +246,7 @@ t_send_async_connection_timeout(_Config) ->
query_mode => "async",
connect_timeout => ResponseDelayMS * 2,
request_timeout => 10000,
resource_request_timeout => "infinity"
resource_request_ttl => "infinity"
}),
NumberOfMessagesToSend = 10,
[
@ -269,7 +268,7 @@ t_async_free_retries(_Config) ->
query_mode => "sync",
connect_timeout => 1_000,
request_timeout => 10_000,
resource_request_timeout => "10000s"
resource_request_ttl => "10000s"
}),
%% Fail 5 times then succeed.
Context = #{error_attempts => 5},
@ -295,7 +294,7 @@ t_async_common_retries(_Config) ->
resume_interval => "100ms",
connect_timeout => 1_000,
request_timeout => 10_000,
resource_request_timeout => "10000s"
resource_request_ttl => "10000s"
}),
%% Keeps failing until connector gives up.
Context = #{error_attempts => infinity},

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_cassandra, [
{description, "EMQX Enterprise Cassandra Bridge"},
{vsn, "0.1.1"},
{vsn, "0.1.2"},
{registered, []},
{applications, [kernel, stdlib, ecql]},
{env, []},

View File

@ -59,7 +59,6 @@ values(_Method, Type) ->
resource_opts => #{
worker_pool_size => 8,
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME,
query_mode => sync,

View File

@ -480,6 +480,8 @@ prepare_cql_to_conn(Conn, [{Key, SQL} | PrepareList], Statements) when is_pid(Co
handle_result({error, disconnected}) ->
{error, {recoverable_error, disconnected}};
handle_result({error, ecpool_empty}) ->
{error, {recoverable_error, ecpool_empty}};
handle_result({error, Error}) ->
{error, {unrecoverable_error, Error}};
handle_result(Res) ->

View File

@ -218,7 +218,7 @@ cassa_config(BridgeType, Config) ->
" password = ~p\n"
" cql = ~p\n"
" resource_opts = {\n"
" request_timeout = 500ms\n"
" request_ttl = 500ms\n"
" batch_size = ~b\n"
" query_mode = ~s\n"
" }\n"
@ -511,7 +511,6 @@ t_write_failure(Config) ->
#{
<<"resource_opts">> =>
#{
<<"auto_restart_interval">> => <<"100ms">>,
<<"resume_interval">> => <<"100ms">>,
<<"health_check_interval">> => <<"100ms">>
}
@ -636,7 +635,7 @@ t_bad_sql_parameter(Config) ->
Config,
#{
<<"resource_opts">> => #{
<<"request_timeout">> => 500,
<<"request_ttl">> => 500,
<<"resume_interval">> => 100,
<<"health_check_interval">> => 100
}

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_clickhouse, [
{description, "EMQX Enterprise ClickHouse Bridge"},
{vsn, "0.2.0"},
{vsn, "0.2.1"},
{registered, []},
{applications, [kernel, stdlib, clickhouse, emqx_resource]},
{env, []},

View File

@ -56,7 +56,6 @@ values(_Method, Type) ->
resource_opts => #{
worker_pool_size => 8,
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME,
query_mode => async,

View File

@ -464,7 +464,12 @@ transform_and_log_clickhouse_result(ClickhouseErrorResult, ResourceID, SQL) ->
sql => SQL,
reason => ClickhouseErrorResult
}),
{error, ClickhouseErrorResult}.
case ClickhouseErrorResult of
{error, ecpool_empty} ->
{error, {recoverable_error, ecpool_empty}};
_ ->
{error, ClickhouseErrorResult}
end.
snabbkaffe_log_return(_Result) ->
?tp(

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_dynamo, [
{description, "EMQX Enterprise Dynamo Bridge"},
{vsn, "0.1.1"},
{vsn, "0.1.2"},
{registered, []},
{applications, [kernel, stdlib, erlcloud]},
{env, []},

View File

@ -52,7 +52,6 @@ values(_Method) ->
resource_opts => #{
worker_pool_size => 8,
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME,
query_mode => sync,

View File

@ -170,7 +170,12 @@ do_query(
query => Query,
reason => Reason
}),
Result;
case Reason of
ecpool_empty ->
{error, {recoverable_error, Reason}};
_ ->
Result
end;
_ ->
?tp(
dynamo_connector_query_return,

View File

@ -170,7 +170,7 @@ dynamo_config(BridgeType, Config) ->
" aws_access_key_id = ~p\n"
" aws_secret_access_key = ~p\n"
" resource_opts = {\n"
" request_timeout = 500ms\n"
" request_ttl = 500ms\n"
" batch_size = ~b\n"
" query_mode = ~s\n"
" }\n"

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_gcp_pubsub, [
{description, "EMQX Enterprise GCP Pub/Sub Bridge"},
{vsn, "0.1.1"},
{vsn, "0.1.2"},
{registered, []},
{applications, [
kernel,

View File

@ -32,7 +32,7 @@
connect_timeout := emqx_schema:duration_ms(),
max_retries := non_neg_integer(),
pubsub_topic := binary(),
resource_opts := #{request_timeout := emqx_schema:duration_ms(), any() => term()},
resource_opts := #{request_ttl := infinity | emqx_schema:duration_ms(), any() => term()},
service_account_json := service_account_json(),
any() => term()
}.
@ -44,7 +44,7 @@
pool_name := binary(),
project_id := binary(),
pubsub_topic := binary(),
request_timeout := timer:time()
request_ttl := infinity | timer:time()
}.
-type headers() :: [{binary(), iodata()}].
-type body() :: iodata().
@ -69,7 +69,7 @@ on_start(
payload_template := PayloadTemplate,
pool_size := PoolSize,
pubsub_topic := PubSubTopic,
resource_opts := #{request_timeout := RequestTimeout}
resource_opts := #{request_ttl := RequestTTL}
} = Config
) ->
?SLOG(info, #{
@ -108,7 +108,7 @@ on_start(
pool_name => ResourceId,
project_id => ProjectId,
pubsub_topic => PubSubTopic,
request_timeout => RequestTimeout
request_ttl => RequestTTL
},
?tp(
gcp_pubsub_on_start_before_starting_pool,
@ -344,7 +344,7 @@ do_send_requests_sync(State, Requests, ResourceId) ->
#{
pool_name := PoolName,
max_retries := MaxRetries,
request_timeout := RequestTimeout
request_ttl := RequestTTL
} = State,
?tp(
gcp_pubsub_bridge_do_send_requests,
@ -371,7 +371,7 @@ do_send_requests_sync(State, Requests, ResourceId) ->
PoolName,
Method,
Request,
RequestTimeout,
RequestTTL,
MaxRetries
)
of
@ -467,7 +467,7 @@ do_send_requests_sync(State, Requests, ResourceId) ->
do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId) ->
#{
pool_name := PoolName,
request_timeout := RequestTimeout
request_ttl := RequestTTL
} = State,
?tp(
gcp_pubsub_bridge_do_send_requests,
@ -494,7 +494,7 @@ do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId) ->
Worker,
Method,
Request,
RequestTimeout,
RequestTTL,
{fun ?MODULE:reply_delegator/3, [ResourceId, ReplyFunAndArgs]}
),
{ok, Worker}.

View File

@ -287,7 +287,7 @@ gcp_pubsub_config(Config) ->
" pool_size = 1\n"
" pipelining = ~b\n"
" resource_opts = {\n"
" request_timeout = 500ms\n"
" request_ttl = 500ms\n"
" metrics_flush_interval = 700ms\n"
" worker_pool_size = 1\n"
" query_mode = ~s\n"
@ -627,7 +627,7 @@ t_publish_success_infinity_timeout(Config) ->
ServiceAccountJSON = ?config(service_account_json, Config),
Topic = <<"t/topic">>,
{ok, _} = create_bridge(Config, #{
<<"resource_opts">> => #{<<"request_timeout">> => <<"infinity">>}
<<"resource_opts">> => #{<<"request_ttl">> => <<"infinity">>}
}),
{ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),

View File

@ -277,7 +277,7 @@ influxdb_config(apiv1 = Type, InfluxDBHost, InfluxDBPort, Config) ->
" precision = ns\n"
" write_syntax = \"~s\"\n"
" resource_opts = {\n"
" request_timeout = 1s\n"
" request_ttl = 1s\n"
" query_mode = ~s\n"
" batch_size = ~b\n"
" }\n"
@ -314,7 +314,7 @@ influxdb_config(apiv2 = Type, InfluxDBHost, InfluxDBPort, Config) ->
" precision = ns\n"
" write_syntax = \"~s\"\n"
" resource_opts = {\n"
" request_timeout = 1s\n"
" request_ttl = 1s\n"
" query_mode = ~s\n"
" batch_size = ~b\n"
" }\n"

View File

@ -227,7 +227,6 @@ conn_bridge_example(_Method, Type) ->
resource_opts => #{
worker_pool_size => 8,
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
query_mode => async,
max_buffer_bytes => ?DEFAULT_BUFFER_BYTES
}

View File

@ -135,8 +135,8 @@ bridge_config(TestCase, _TestGroup, Config) ->
"iotdb_version = \"~s\"\n"
" pool_size = 1\n"
" resource_opts = {\n"
" auto_restart_interval = 5000\n"
" request_timeout = 30000\n"
" health_check_interval = 5000\n"
" request_ttl = 30000\n"
" query_mode = \"async\"\n"
" worker_pool_size = 1\n"
" }\n"

View File

@ -115,14 +115,6 @@ end_per_testcase(_TestCase, _Config) ->
delete_all_bridges(),
ok.
set_special_configs(emqx_management) ->
Listeners = #{http => #{port => 8081}},
Config = #{
listeners => Listeners,
applications => [#{id => "admin", secret => "public"}]
},
emqx_config:put([emqx_management], Config),
ok;
set_special_configs(emqx_dashboard) ->
emqx_dashboard_api_test_helpers:set_default_config(),
ok;

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_bridge_mqtt, [
{description, "EMQX MQTT Broker Bridge"},
{vsn, "0.1.0"},
{vsn, "0.1.1"},
{registered, []},
{applications, [
kernel,

View File

@ -755,11 +755,9 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
<<"worker_pool_size">> => 2,
<<"query_mode">> => <<"sync">>,
%% using a long time so we can test recovery
<<"request_timeout">> => <<"15s">>,
%% to make it check the healthy quickly
<<"health_check_interval">> => <<"0.5s">>,
%% to make it reconnect quickly
<<"auto_restart_interval">> => <<"1s">>
<<"request_ttl">> => <<"15s">>,
%% to make it check the healthy and reconnect quickly
<<"health_check_interval">> => <<"0.5s">>
}
}
),
@ -865,11 +863,9 @@ t_mqtt_conn_bridge_egress_async_reconnect(_) ->
<<"worker_pool_size">> => 2,
<<"query_mode">> => <<"async">>,
%% using a long time so we can test recovery
<<"request_timeout">> => <<"15s">>,
%% to make it check the healthy quickly
<<"health_check_interval">> => <<"0.5s">>,
%% to make it reconnect quickly
<<"auto_restart_interval">> => <<"1s">>
<<"request_ttl">> => <<"15s">>,
%% to make it check the healthy and reconnect quickly
<<"health_check_interval">> => <<"0.5s">>
}
}
),

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_opents, [
{description, "EMQX Enterprise OpenTSDB Bridge"},
{vsn, "0.1.0"},
{vsn, "0.1.1"},
{registered, []},
{applications, [
kernel,

View File

@ -42,7 +42,6 @@ values(_Method) ->
resource_opts => #{
worker_pool_size => 1,
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME,
query_mode => async,

View File

@ -142,7 +142,12 @@ do_query(InstanceId, Query, #{pool_name := PoolName} = State) ->
query => Query,
reason => Reason
}),
Result;
case Reason of
ecpool_empty ->
{error, {recoverable_error, Reason}};
_ ->
Result
end;
_ ->
?tp(
opents_connector_query_return,

View File

@ -127,7 +127,7 @@ opents_config(BridgeType, Config) ->
" enable = true\n"
" server = ~p\n"
" resource_opts = {\n"
" request_timeout = 500ms\n"
" request_ttl = 500ms\n"
" batch_size = ~b\n"
" query_mode = sync\n"
" }\n"
@ -298,7 +298,7 @@ t_write_timeout(Config) ->
Config,
#{
<<"resource_opts">> => #{
<<"request_timeout">> => 500,
<<"request_ttl">> => 500,
<<"resume_interval">> => 100,
<<"health_check_interval">> => 100
}

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_oracle, [
{description, "EMQX Enterprise Oracle Database Bridge"},
{vsn, "0.1.1"},
{vsn, "0.1.2"},
{registered, []},
{applications, [
kernel,

View File

@ -16,7 +16,8 @@
namespace/0,
roots/0,
fields/1,
desc/1
desc/1,
config_validator/1
]).
-define(DEFAULT_SQL, <<
@ -50,7 +51,6 @@ values(_Method) ->
resource_opts => #{
worker_pool_size => 8,
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME,
query_mode => async,
@ -107,3 +107,12 @@ type_field(Type) ->
name_field() ->
{name, hoconsc:mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
config_validator(#{<<"server">> := Server} = Config) when
not is_map(Server) andalso
not is_map_key(<<"sid">>, Config) andalso
not is_map_key(<<"service_name">>, Config)
->
{error, "neither SID nor Service Name was set"};
config_validator(_) ->
ok.

View File

@ -203,11 +203,9 @@ oracle_config(TestCase, _ConnectionType, Config) ->
" pool_size = 1\n"
" sql = \"~s\"\n"
" resource_opts = {\n"
" auto_restart_interval = \"5s\"\n"
" health_check_interval = \"5s\"\n"
" request_timeout = \"30s\"\n"
" request_ttl = \"30s\"\n"
" query_mode = \"async\"\n"
" enable_batch = true\n"
" batch_size = 3\n"
" batch_time = \"3s\"\n"
" worker_pool_size = 1\n"
@ -517,3 +515,15 @@ t_on_get_status(Config) ->
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
),
ok.
t_no_sid_nor_service_name(Config0) ->
OracleConfig0 = ?config(oracle_config, Config0),
OracleConfig1 = maps:remove(<<"sid">>, OracleConfig0),
OracleConfig = maps:remove(<<"service_name">>, OracleConfig1),
NewOracleConfig = {oracle_config, OracleConfig},
Config = lists:keyreplace(oracle_config, 1, Config0, NewOracleConfig),
?assertMatch(
{error, #{kind := validation_error, reason := "neither SID nor Service Name was set"}},
create_bridge(Config)
),
ok.

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_pgsql, [
{description, "EMQX Enterprise PostgreSQL Bridge"},
{vsn, "0.1.1"},
{vsn, "0.1.2"},
{registered, []},
{applications, [kernel, stdlib]},
{env, []},

View File

@ -55,7 +55,6 @@ values(_Method, Type) ->
resource_opts => #{
worker_pool_size => 8,
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME,
query_mode => async,

View File

@ -193,7 +193,7 @@ pgsql_config(BridgeType, Config) ->
" password = ~p\n"
" sql = ~p\n"
" resource_opts = {\n"
" request_timeout = 500ms\n"
" request_ttl = 500ms\n"
" batch_size = ~b\n"
" query_mode = ~s\n"
" }\n"
@ -503,7 +503,6 @@ t_write_timeout(Config) ->
Config,
#{
<<"resource_opts">> => #{
<<"auto_restart_interval">> => <<"100ms">>,
<<"resume_interval">> => <<"100ms">>,
<<"health_check_interval">> => <<"100ms">>
}

View File

@ -154,8 +154,7 @@ fields(producer_resource_opts) ->
health_check_interval,
resume_interval,
start_after_created,
start_timeout,
auto_restart_interval
start_timeout
],
lists:filtermap(
fun

View File

@ -1040,7 +1040,7 @@ t_resource_manager_crash_before_producers_started(Config) ->
end),
%% even if the resource manager is dead, we can still
%% clear the allocated resources.
{{error, {config_update_crashed, {killed, _}}}, {ok, _}} =
{{error, {config_update_crashed, _}}, {ok, _}} =
?wait_async_action(
create_bridge(Config),
#{?snk_kind := pulsar_bridge_stopped, pulsar_producers := undefined},

View File

@ -57,7 +57,6 @@ values(_Method, Type) ->
resource_opts => #{
worker_pool_size => 8,
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME,
query_mode => async,

View File

@ -431,11 +431,12 @@ on_query(
state => emqx_utils:redact(State)
}),
MessageData = format_data(PayloadTemplate, Data),
ecpool:pick_and_do(
Res = ecpool:pick_and_do(
PoolName,
{?MODULE, publish_messages, [Config, [MessageData]]},
no_handover
).
),
handle_result(Res).
%% emqx_resource callback that is called when a batch query is received
@ -467,11 +468,12 @@ on_batch_query(
|| Data <- MessagesToInsert
],
%% Publish the messages
ecpool:pick_and_do(
Res = ecpool:pick_and_do(
PoolName,
{?MODULE, publish_messages, [Config, FormattedMessages]},
no_handover
).
),
handle_result(Res).
publish_messages(
{_Connection, Channel},
@ -543,3 +545,8 @@ format_data([], Msg) ->
emqx_utils_json:encode(Msg);
format_data(Tokens, Msg) ->
emqx_plugin_libs_rule:proc_tmpl(Tokens, Msg).
handle_result({error, ecpool_empty}) ->
{error, {recoverable_error, ecpool_empty}};
handle_result(Res) ->
Res.

View File

@ -52,7 +52,6 @@ values(post) ->
resource_opts => #{
worker_pool_size => 1,
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME,
query_mode => sync,

View File

@ -13,6 +13,9 @@
% Bridge defaults
-define(TOPIC, "TopicTest").
-define(DENY_TOPIC, "DENY_TOPIC").
-define(ACCESS_KEY, "RocketMQ").
-define(SECRET_KEY, "12345678").
-define(BATCH_SIZE, 10).
-define(PAYLOAD, <<"HELLO">>).
@ -25,17 +28,19 @@
all() ->
[
{group, async},
{group, sync}
{group, sync},
{group, acl}
].
groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
TCs = emqx_common_test_helpers:all(?MODULE) -- [t_acl_deny],
BatchingGroups = [{group, with_batch}, {group, without_batch}],
[
{async, BatchingGroups},
{sync, BatchingGroups},
{with_batch, TCs},
{without_batch, TCs}
{without_batch, TCs},
{acl, [t_acl_deny]}
].
init_per_group(async, Config) ->
@ -48,6 +53,9 @@ init_per_group(with_batch, Config0) ->
init_per_group(without_batch, Config0) ->
Config = [{batch_size, 1} | Config0],
common_init(Config);
init_per_group(acl, Config0) ->
Config = [{batch_size, 1}, {query_mode, sync} | Config0],
common_init(Config);
init_per_group(_Group, Config) ->
Config.
@ -137,9 +145,11 @@ rocketmq_config(BridgeType, Config) ->
"bridges.~s.~s {\n"
" enable = true\n"
" servers = ~p\n"
" access_key = ~p\n"
" secret_key = ~p\n"
" topic = ~p\n"
" resource_opts = {\n"
" request_timeout = 1500ms\n"
" request_ttl = 1500ms\n"
" batch_size = ~b\n"
" query_mode = ~s\n"
" }\n"
@ -148,6 +158,8 @@ rocketmq_config(BridgeType, Config) ->
BridgeType,
Name,
Server,
?ACCESS_KEY,
?SECRET_KEY,
?TOPIC,
BatchSize,
QueryMode
@ -271,3 +283,29 @@ t_simple_query(Config) ->
Result = query_resource(Config, Request),
?assertEqual(ok, Result),
ok.
t_acl_deny(Config0) ->
RocketCfg = ?GET_CONFIG(rocketmq_config, Config0),
RocketCfg2 = RocketCfg#{<<"topic">> := ?DENY_TOPIC},
Config = lists:keyreplace(rocketmq_config, 1, Config0, {rocketmq_config, RocketCfg2}),
?assertMatch(
{ok, _},
create_bridge(Config)
),
SentData = #{payload => ?PAYLOAD},
?check_trace(
begin
?wait_async_action(
?assertMatch({error, #{<<"code">> := 1}}, send_message(Config, SentData)),
#{?snk_kind := rocketmq_connector_query_return},
10_000
),
ok
end,
fun(Trace0) ->
Trace = ?of_kind(rocketmq_connector_query_return, Trace0),
?assertMatch([#{error := #{<<"code">> := 1}}], Trace),
ok
end
),
ok.

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_sqlserver, [
{description, "EMQX Enterprise SQL Server Bridge"},
{vsn, "0.1.0"},
{vsn, "0.1.1"},
{registered, []},
{applications, [kernel, stdlib, odbc]},
{env, []},

View File

@ -56,7 +56,6 @@ values(post) ->
resource_opts => #{
worker_pool_size => 1,
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME,
query_mode => async,

View File

@ -55,8 +55,8 @@
default_port => ?SQLSERVER_DEFAULT_PORT
}).
-define(REQUEST_TIMEOUT(RESOURCE_OPTS),
maps:get(request_timeout, RESOURCE_OPTS, ?DEFAULT_REQUEST_TIMEOUT)
-define(REQUEST_TTL(RESOURCE_OPTS),
maps:get(request_ttl, RESOURCE_OPTS, ?DEFAULT_REQUEST_TTL)
).
-define(BATCH_INSERT_TEMP, batch_insert_temp).
@ -336,6 +336,7 @@ conn_str([{_, _} | Opts], Acc) ->
) ->
{ok, list()}
| {error, {recoverable_error, term()}}
| {error, {unrecoverable_error, term()}}
| {error, term()}.
do_query(
ResourceId,
@ -374,7 +375,12 @@ do_query(
query => Query,
reason => Reason
}),
Result;
case Reason of
ecpool_empty ->
{error, {recoverable_error, Reason}};
_ ->
Result
end;
_ ->
?tp(
sqlserver_connector_query_return,
@ -388,7 +394,7 @@ worker_do_insert(
) ->
LogMeta = #{connector => ResourceId, state => State},
try
case execute(Conn, SQL, ?REQUEST_TIMEOUT(ResourceOpts)) of
case execute(Conn, SQL, ?REQUEST_TTL(ResourceOpts)) of
{selected, Rows, _} ->
{ok, Rows};
{updated, _} ->

View File

@ -461,7 +461,7 @@ sqlserver_config(BridgeType, Config) ->
" sql = ~p\n"
" driver = ~p\n"
" resource_opts = {\n"
" request_timeout = 500ms\n"
" request_ttl = 500ms\n"
" batch_size = ~b\n"
" query_mode = ~s\n"
" worker_pool_size = ~b\n"

View File

@ -54,7 +54,6 @@ values(_Method) ->
resource_opts => #{
worker_pool_size => 8,
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME,
query_mode => sync,

View File

@ -200,7 +200,12 @@ do_query_job(InstanceId, Job, #{pool_name := PoolName} = State) ->
job => Job,
reason => Reason
}),
Result;
case Reason of
ecpool_empty ->
{error, {recoverable_error, Reason}};
_ ->
Result
end;
_ ->
?tp(
tdengine_connector_query_return,

View File

@ -190,7 +190,7 @@ tdengine_config(BridgeType, Config) ->
" password = ~p\n"
" sql = ~p\n"
" resource_opts = {\n"
" request_timeout = 500ms\n"
" request_ttl = 500ms\n"
" batch_size = ~b\n"
" query_mode = ~s\n"
" }\n"
@ -456,7 +456,7 @@ t_write_timeout(Config) ->
Config,
#{
<<"resource_opts">> => #{
<<"request_timeout">> => 500,
<<"request_ttl">> => 500,
<<"resume_interval">> => 100,
<<"health_check_interval">> => 100
}

View File

@ -18,16 +18,40 @@
-export([
load/0,
admins/1,
conf/1,
unload/0
]).
-define(CMD, cluster_call).
-define(CLUSTER_CALL, cluster_call).
-define(CONF, conf).
load() ->
emqx_ctl:register_command(?CMD, {?MODULE, admins}, []).
emqx_ctl:register_command(?CLUSTER_CALL, {?MODULE, admins}, []),
emqx_ctl:register_command(?CONF, {?MODULE, conf}, []).
unload() ->
emqx_ctl:unregister_command(?CMD).
emqx_ctl:unregister_command(?CLUSTER_CALL),
emqx_ctl:unregister_command(?CONF).
conf(["show", "--keys-only"]) ->
print(emqx_config:get_root_names());
conf(["show"]) ->
print_hocon(get_config());
conf(["show", Key]) ->
print_hocon(get_config(Key));
conf(["load", Path]) ->
load_config(Path);
conf(_) ->
emqx_ctl:usage(
[
%% TODO add reload
%{"conf reload", "reload etc/emqx.conf on local node"},
{"conf show --keys-only", "print all keys"},
{"conf show", "print all running configures"},
{"conf show <key>", "print a specific configuration"},
{"conf load <path>", "load a hocon file to all nodes"}
]
).
admins(["status"]) ->
status();
@ -43,7 +67,7 @@ admins(["skip", Node0]) ->
status();
admins(["tnxid", TnxId0]) ->
TnxId = list_to_integer(TnxId0),
emqx_ctl:print("~p~n", [emqx_cluster_rpc:query(TnxId)]);
print(emqx_cluster_rpc:query(TnxId));
admins(["fast_forward"]) ->
status(),
Nodes = mria:running_nodes(),
@ -91,3 +115,30 @@ status() ->
Status
),
emqx_ctl:print("-----------------------------------------------\n").
print(Json) ->
emqx_ctl:print("~ts~n", [emqx_logger_jsonfmt:best_effort_json(Json)]).
print_hocon(Hocon) ->
emqx_ctl:print("~ts~n", [hocon_pp:do(Hocon, #{})]).
get_config() -> emqx_config:fill_defaults(emqx:get_raw_config([])).
get_config(Key) -> emqx_config:fill_defaults(#{Key => emqx:get_raw_config([Key])}).
-define(OPTIONS, #{rawconf_with_defaults => true, override_to => cluster}).
load_config(Path) ->
case hocon:files([Path]) of
{ok, Conf} ->
maps:foreach(
fun(Key, Value) ->
case emqx_conf:update([Key], Value, ?OPTIONS) of
{ok, _} -> emqx_ctl:print("load ~ts ok~n", [Key]);
{error, Reason} -> emqx_ctl:print("load ~ts failed: ~p~n", [Key, Reason])
end
end,
Conf
);
{error, Reason} ->
emqx_ctl:print("load ~ts failed~n~p~n", [Path, Reason]),
{error, bad_hocon_file}
end.

Some files were not shown because too many files have changed in this diff Show More