Merge pull request #10852 from zmstone/0529-prepare-for-v5.0.26-alpha.1

0529 prepare for v5.0.26 alpha.1
This commit is contained in:
Zaiming (Stone) Shi 2023-05-29 10:12:53 +02:00 committed by GitHub
commit 6d264bb79c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
261 changed files with 3241 additions and 2462 deletions

View File

@ -25,7 +25,7 @@ jobs:
prepare:
runs-on: ubuntu-22.04
# prepare source with any OTP version, no need for a matrix
container: "ghcr.io/emqx/emqx-builder/5.0-34:1.13.4-24.3.4.2-3-ubuntu22.04"
container: "ghcr.io/emqx/emqx-builder/5.0-35:1.13.4-24.3.4.2-3-ubuntu22.04"
outputs:
PROFILE: ${{ steps.get_profile.outputs.PROFILE }}
@ -121,7 +121,7 @@ jobs:
# NOTE: 'otp' and 'elixir' are to configure emqx-builder image
# only support latest otp and elixir, not a matrix
builder:
- 5.0-34 # update to latest
- 5.0-35 # update to latest
otp:
- 24.3.4.2-3 # switch to 25 once ready to release 5.1
elixir:

View File

@ -21,7 +21,7 @@ on:
jobs:
prepare:
runs-on: ubuntu-22.04
container: ghcr.io/emqx/emqx-builder/5.0-34:1.13.4-24.3.4.2-3-ubuntu22.04
container: ghcr.io/emqx/emqx-builder/5.0-35:1.13.4-24.3.4.2-3-ubuntu22.04
outputs:
BUILD_PROFILE: ${{ steps.get_profile.outputs.BUILD_PROFILE }}
IS_EXACT_TAG: ${{ steps.get_profile.outputs.IS_EXACT_TAG }}
@ -102,9 +102,16 @@ jobs:
- name: run emqx
timeout-minutes: 5
run: |
$ErrorActionPreference = "Stop"
./_build/${{ matrix.profile }}/rel/emqx/bin/emqx start
Start-Sleep -s 5
echo "EMQX started"
Start-Sleep -s 10
$pingOutput = ./_build/${{ matrix.profile }}/rel/emqx/bin/emqx ping
if ($pingOutput = 'pong') {
echo "EMQX started OK"
} else {
echo "Failed to ping EMQX $pingOutput"
Exit 1
}
./_build/${{ matrix.profile }}/rel/emqx/bin/emqx stop
echo "EMQX stopped"
./_build/${{ matrix.profile }}/rel/emqx/bin/emqx install
@ -184,7 +191,7 @@ jobs:
- aws-arm64
- ubuntu-22.04
builder:
- 5.0-34
- 5.0-35
elixir:
- 1.13.4
exclude:
@ -198,7 +205,7 @@ jobs:
arch: amd64
os: ubuntu22.04
build_machine: ubuntu-22.04
builder: 5.0-34
builder: 5.0-35
elixir: 1.13.4
release_with: elixir
- profile: emqx
@ -206,7 +213,7 @@ jobs:
arch: amd64
os: amzn2
build_machine: ubuntu-22.04
builder: 5.0-34
builder: 5.0-35
elixir: 1.13.4
release_with: elixir
@ -306,35 +313,3 @@ jobs:
fi
aws s3 cp --recursive packages/$PROFILE s3://${{ secrets.AWS_S3_BUCKET }}/$s3dir/${{ github.ref_name }}
aws cloudfront create-invalidation --distribution-id ${{ secrets.AWS_CLOUDFRONT_ID }} --paths "/$s3dir/${{ github.ref_name }}/*"
- name: Push to packagecloud.io
env:
PROFILE: ${{ matrix.profile }}
VERSION: ${{ needs.prepare.outputs.VERSION }}
PACKAGECLOUD_TOKEN: ${{ secrets.PACKAGECLOUD_TOKEN }}
run: |
set -eu
REPO=$PROFILE
if [ $PROFILE = 'emqx-enterprise' ]; then
REPO='emqx-enterprise5'
fi
function push() {
docker run -t --rm -e PACKAGECLOUD_TOKEN=$PACKAGECLOUD_TOKEN -v $(pwd)/$2:/w/$2 -w /w ghcr.io/emqx/package_cloud push emqx/$REPO/$1 $2
}
push "debian/buster" "packages/$PROFILE/$PROFILE-$VERSION-debian10-amd64.deb"
push "debian/buster" "packages/$PROFILE/$PROFILE-$VERSION-debian10-arm64.deb"
push "debian/bullseye" "packages/$PROFILE/$PROFILE-$VERSION-debian11-amd64.deb"
push "debian/bullseye" "packages/$PROFILE/$PROFILE-$VERSION-debian11-arm64.deb"
push "ubuntu/bionic" "packages/$PROFILE/$PROFILE-$VERSION-ubuntu18.04-amd64.deb"
push "ubuntu/bionic" "packages/$PROFILE/$PROFILE-$VERSION-ubuntu18.04-arm64.deb"
push "ubuntu/focal" "packages/$PROFILE/$PROFILE-$VERSION-ubuntu20.04-amd64.deb"
push "ubuntu/focal" "packages/$PROFILE/$PROFILE-$VERSION-ubuntu20.04-arm64.deb"
push "ubuntu/jammy" "packages/$PROFILE/$PROFILE-$VERSION-ubuntu22.04-amd64.deb"
push "ubuntu/jammy" "packages/$PROFILE/$PROFILE-$VERSION-ubuntu22.04-arm64.deb"
push "el/6" "packages/$PROFILE/$PROFILE-$VERSION-amzn2-amd64.rpm"
push "el/6" "packages/$PROFILE/$PROFILE-$VERSION-amzn2-arm64.rpm"
push "el/7" "packages/$PROFILE/$PROFILE-$VERSION-el7-amd64.rpm"
push "el/7" "packages/$PROFILE/$PROFILE-$VERSION-el7-arm64.rpm"
push "el/8" "packages/$PROFILE/$PROFILE-$VERSION-el8-amd64.rpm"
push "el/8" "packages/$PROFILE/$PROFILE-$VERSION-el8-arm64.rpm"
push "el/9" "packages/$PROFILE/$PROFILE-$VERSION-el9-amd64.rpm"
push "el/9" "packages/$PROFILE/$PROFILE-$VERSION-el9-arm64.rpm"

View File

@ -24,9 +24,6 @@ jobs:
profile:
- ['emqx', 'master']
- ['emqx-enterprise', 'release-50']
branch:
- master
- release-50
otp:
- 24.3.4.2-3
arch:
@ -35,7 +32,7 @@ jobs:
- debian10
- amzn2
builder:
- 5.0-34
- 5.0-35
elixir:
- 1.13.4

View File

@ -35,7 +35,7 @@ jobs:
- ["emqx-enterprise", "24.3.4.2-3", "amzn2", "erlang"]
- ["emqx-enterprise", "25.1.2-3", "ubuntu20.04", "erlang"]
builder:
- 5.0-34
- 5.0-35
elixir:
- '1.13.4'
@ -111,8 +111,14 @@ jobs:
timeout-minutes: 5
run: |
./_build/${{ matrix.profile }}/rel/emqx/bin/emqx start
Start-Sleep -s 5
echo "EMQX started"
Start-Sleep -s 10
$pingOutput = ./_build/${{ matrix.profile }}/rel/emqx/bin/emqx ping
if ($pingOutput = 'pong') {
echo "EMQX started OK"
} else {
echo "Failed to ping EMQX $pingOutput"
Exit 1
}
./_build/${{ matrix.profile }}/rel/emqx/bin/emqx stop
echo "EMQX stopped"
./_build/${{ matrix.profile }}/rel/emqx/bin/emqx install

View File

@ -6,7 +6,7 @@ on:
jobs:
check_deps_integrity:
runs-on: ubuntu-22.04
container: ghcr.io/emqx/emqx-builder/5.0-34:1.13.4-25.1.2-3-ubuntu22.04
container: ghcr.io/emqx/emqx-builder/5.0-35:1.13.4-25.1.2-3-ubuntu22.04
steps:
- uses: actions/checkout@v3

View File

@ -5,7 +5,7 @@ on: [pull_request]
jobs:
code_style_check:
runs-on: ubuntu-22.04
container: "ghcr.io/emqx/emqx-builder/5.0-34:1.13.4-25.1.2-3-ubuntu22.04"
container: "ghcr.io/emqx/emqx-builder/5.0-35:1.13.4-25.1.2-3-ubuntu22.04"
steps:
- uses: actions/checkout@v3
with:

View File

@ -9,7 +9,7 @@ jobs:
elixir_apps_check:
runs-on: ubuntu-22.04
# just use the latest builder
container: "ghcr.io/emqx/emqx-builder/5.0-34:1.13.4-25.1.2-3-ubuntu22.04"
container: "ghcr.io/emqx/emqx-builder/5.0-35:1.13.4-25.1.2-3-ubuntu22.04"
strategy:
fail-fast: false

View File

@ -8,7 +8,7 @@ on:
jobs:
elixir_deps_check:
runs-on: ubuntu-22.04
container: ghcr.io/emqx/emqx-builder/5.0-34:1.13.4-25.1.2-3-ubuntu22.04
container: ghcr.io/emqx/emqx-builder/5.0-35:1.13.4-25.1.2-3-ubuntu22.04
steps:
- name: Checkout

View File

@ -17,7 +17,7 @@ jobs:
profile:
- emqx
- emqx-enterprise
container: ghcr.io/emqx/emqx-builder/5.0-34:1.13.4-25.1.2-3-ubuntu22.04
container: ghcr.io/emqx/emqx-builder/5.0-35:1.13.4-25.1.2-3-ubuntu22.04
steps:
- name: Checkout
uses: actions/checkout@v3

View File

@ -15,7 +15,7 @@ jobs:
prepare:
runs-on: ubuntu-latest
if: github.repository_owner == 'emqx'
container: ghcr.io/emqx/emqx-builder/5.0-34:1.13.4-25.1.2-3-ubuntu20.04
container: ghcr.io/emqx/emqx-builder/5.0-35:1.13.4-25.1.2-3-ubuntu20.04
outputs:
BENCH_ID: ${{ steps.prepare.outputs.BENCH_ID }}
PACKAGE_FILE: ${{ steps.package_file.outputs.PACKAGE_FILE }}
@ -51,11 +51,10 @@ jobs:
needs:
- prepare
env:
TF_VAR_bench_id: ${{ needs.prepare.outputs.BENCH_ID }}
TF_VAR_package_file: ${{ needs.prepare.outputs.PACKAGE_FILE }}
TF_VAR_test_duration: 300
TF_VAR_grafana_api_key: ${{ secrets.TF_EMQX_PERF_TEST_GRAFANA_API_KEY }}
TF_AWS_REGION: eu-north-1
TF_VAR_test_duration: 1800
steps:
- name: Configure AWS Credentials
@ -77,38 +76,37 @@ jobs:
uses: hashicorp/setup-terraform@v2
with:
terraform_wrapper: false
- name: terraform init
- name: 1on1 scenario
id: scenario_1on1
working-directory: ./tf-emqx-performance-test
timeout-minutes: 60
env:
TF_VAR_bench_id: "${{ needs.prepare.outputs.BENCH_ID }}/1on1"
TF_VAR_use_emqttb: 1
TF_VAR_use_emqtt_bench: 0
TF_VAR_emqttb_instance_count: 2
TF_VAR_emqttb_instance_type: "c5.large"
TF_VAR_emqttb_scenario: "@pub --topic 't/%n' --pubinterval 10ms --qos 1 --publatency 50ms --size 16 --num-clients 25000 @sub --topic 't/%n' --num-clients 25000"
TF_VAR_emqx_instance_type: "c5.xlarge"
TF_VAR_emqx_instance_count: 3
run: |
terraform init
- name: terraform apply
working-directory: ./tf-emqx-performance-test
run: |
terraform apply -auto-approve
- name: Wait for test results
timeout-minutes: 30
working-directory: ./tf-emqx-performance-test
id: test-results
run: |
sleep $TF_VAR_test_duration
until aws s3api head-object --bucket tf-emqx-performance-test --key "$TF_VAR_bench_id/DONE" > /dev/null 2>&1
do
printf '.'
sleep 10
done
echo
aws s3 cp "s3://tf-emqx-performance-test/$TF_VAR_bench_id/metrics.json" ./
aws s3 cp "s3://tf-emqx-performance-test/$TF_VAR_bench_id/stats.json" ./
echo MESSAGES_DELIVERED=$(cat metrics.json | jq '[.[]."messages.delivered"] | add') >> $GITHUB_OUTPUT
echo MESSAGES_DROPPED=$(cat metrics.json | jq '[.[]."messages.dropped"] | add') >> $GITHUB_OUTPUT
./wait-emqttb.sh
./fetch-metrics.sh
MESSAGES_RECEIVED=$(cat metrics.json | jq '[.[]."messages.received"] | add')
MESSAGES_SENT=$(cat metrics.json | jq '[.[]."messages.sent"] | add')
echo MESSAGES_DROPPED=$(cat metrics.json | jq '[.[]."messages.dropped"] | add') >> $GITHUB_OUTPUT
echo PUB_MSG_RATE=$(($MESSAGES_RECEIVED / $TF_VAR_test_duration)) >> $GITHUB_OUTPUT
echo SUB_MSG_RATE=$(($MESSAGES_SENT / $TF_VAR_test_duration)) >> $GITHUB_OUTPUT
terraform destroy -auto-approve
- name: Send notification to Slack
if: success()
uses: slackapi/slack-github-action@v1.23.0
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
with:
payload: |
{"text": "EMQX performance test completed.\nMessages delivered: ${{ steps.test-results.outputs.MESSAGES_DELIVERED }}.\nMessages dropped: ${{ steps.test-results.outputs.MESSAGES_DROPPED }}.\nhttps://github.com/${{ github.repository }}/actions/runs/${{ github.run_id }}"}
{"text": "Performance test result for 1on1 scenario (50k pub, 50k sub): ${{ job.status }}\nhttps://github.com/${{ github.repository }}/actions/runs/${{ github.run_id }}\n*Pub message rate*: ${{ steps.scenario_1on1.outputs.PUB_MSG_RATE }}\n*Sub message rate*: ${{ steps.scenario_1on1.outputs.SUB_MSG_RATE }}\nDropped messages: ${{ steps.scenario_1on1.outputs.MESSAGES_DROPPED }}"}
- name: terraform destroy
if: always()
working-directory: ./tf-emqx-performance-test
@ -117,10 +115,10 @@ jobs:
- uses: actions/upload-artifact@v3
if: success()
with:
name: test-results
path: "./tf-emqx-performance-test/*.json"
name: metrics
path: "./tf-emqx-performance-test/metrics.json"
- uses: actions/upload-artifact@v3
if: always()
if: failure()
with:
name: terraform
path: |

View File

@ -15,7 +15,7 @@ on:
jobs:
upload:
runs-on: ubuntu-20.04
runs-on: ubuntu-22.04
strategy:
fail-fast: false
steps:
@ -53,16 +53,6 @@ jobs:
BUCKET=${{ secrets.AWS_S3_BUCKET }}
OUTPUT_DIR=${{ steps.profile.outputs.s3dir }}
aws s3 cp --recursive s3://$BUCKET/$OUTPUT_DIR/${{ github.ref_name }} packages
cd packages
DEFAULT_BEAM_PLATFORM='otp24.3.4.2-3'
# all packages including full-name and default-name are uploaded to s3
# but we only upload default-name packages (and elixir) as github artifacts
# so we rename (overwrite) non-default packages before uploading
while read -r fname; do
default_fname=$(echo "$fname" | sed "s/-${DEFAULT_BEAM_PLATFORM}//g")
echo "$fname -> $default_fname"
mv -f "$fname" "$default_fname"
done < <(find . -maxdepth 1 -type f | grep -E "emqx(-enterprise)?-5\.[0-9]+\.[0-9]+.*-${DEFAULT_BEAM_PLATFORM}" | grep -v elixir)
- uses: alexellis/upload-assets@0.4.0
env:
GITHUB_TOKEN: ${{ github.token }}
@ -79,3 +69,35 @@ jobs:
-X POST \
-d "{\"repo\":\"emqx/emqx\", \"tag\": \"${{ github.ref_name }}\" }" \
${{ secrets.EMQX_IO_RELEASE_API }}
- name: Push to packagecloud.io
env:
PROFILE: ${{ steps.profile.outputs.profile }}
VERSION: ${{ steps.profile.outputs.version }}
PACKAGECLOUD_TOKEN: ${{ secrets.PACKAGECLOUD_TOKEN }}
run: |
set -eu
REPO=$PROFILE
if [ $PROFILE = 'emqx-enterprise' ]; then
REPO='emqx-enterprise5'
fi
function push() {
docker run -t --rm -e PACKAGECLOUD_TOKEN=$PACKAGECLOUD_TOKEN -v $(pwd)/$2:/w/$2 -w /w ghcr.io/emqx/package_cloud push emqx/$REPO/$1 $2
}
push "debian/buster" "packages/$PROFILE-$VERSION-debian10-amd64.deb"
push "debian/buster" "packages/$PROFILE-$VERSION-debian10-arm64.deb"
push "debian/bullseye" "packages/$PROFILE-$VERSION-debian11-amd64.deb"
push "debian/bullseye" "packages/$PROFILE-$VERSION-debian11-arm64.deb"
push "ubuntu/bionic" "packages/$PROFILE-$VERSION-ubuntu18.04-amd64.deb"
push "ubuntu/bionic" "packages/$PROFILE-$VERSION-ubuntu18.04-arm64.deb"
push "ubuntu/focal" "packages/$PROFILE-$VERSION-ubuntu20.04-amd64.deb"
push "ubuntu/focal" "packages/$PROFILE-$VERSION-ubuntu20.04-arm64.deb"
push "ubuntu/jammy" "packages/$PROFILE-$VERSION-ubuntu22.04-amd64.deb"
push "ubuntu/jammy" "packages/$PROFILE-$VERSION-ubuntu22.04-arm64.deb"
push "el/6" "packages/$PROFILE-$VERSION-amzn2-amd64.rpm"
push "el/6" "packages/$PROFILE-$VERSION-amzn2-arm64.rpm"
push "el/7" "packages/$PROFILE-$VERSION-el7-amd64.rpm"
push "el/7" "packages/$PROFILE-$VERSION-el7-arm64.rpm"
push "el/8" "packages/$PROFILE-$VERSION-el8-amd64.rpm"
push "el/8" "packages/$PROFILE-$VERSION-el8-arm64.rpm"
push "el/9" "packages/$PROFILE-$VERSION-el9-amd64.rpm"
push "el/9" "packages/$PROFILE-$VERSION-el9-arm64.rpm"

View File

@ -1,7 +1,7 @@
name: Run Configuration tests
concurrency:
group: test-${{ github.event_name }}-${{ github.ref }}
group: conftest-${{ github.event_name }}-${{ github.ref }}
cancel-in-progress: true
on:

View File

@ -12,9 +12,8 @@ jobs:
strategy:
matrix:
builder:
- 5.0-34
- 5.0-35
otp:
- 24.3.4.2-3
- 25.1.2-3
# no need to use more than 1 version of Elixir, since tests
# run using only Erlang code. This is needed just to specify

View File

@ -17,7 +17,7 @@ jobs:
prepare:
runs-on: ubuntu-22.04
# prepare source with any OTP version, no need for a matrix
container: ghcr.io/emqx/emqx-builder/5.0-34:1.13.4-24.3.4.2-3-debian11
container: ghcr.io/emqx/emqx-builder/5.0-35:1.13.4-24.3.4.2-3-debian11
steps:
- uses: actions/checkout@v3
@ -50,7 +50,7 @@ jobs:
os:
- ["debian11", "debian:11-slim"]
builder:
- 5.0-34
- 5.0-35
otp:
- 24.3.4.2-3
elixir:
@ -123,7 +123,7 @@ jobs:
os:
- ["debian11", "debian:11-slim"]
builder:
- 5.0-34
- 5.0-35
otp:
- 24.3.4.2-3
elixir:

View File

@ -15,7 +15,7 @@ concurrency:
jobs:
relup_test_plan:
runs-on: ubuntu-22.04
container: "ghcr.io/emqx/emqx-builder/5.0-34:1.13.4-24.3.4.2-3-ubuntu22.04"
container: "ghcr.io/emqx/emqx-builder/5.0-35:1.13.4-24.3.4.2-3-ubuntu22.04"
outputs:
CUR_EE_VSN: ${{ steps.find-versions.outputs.CUR_EE_VSN }}
OLD_VERSIONS: ${{ steps.find-versions.outputs.OLD_VERSIONS }}

View File

@ -34,12 +34,12 @@ jobs:
MATRIX="$(echo "${APPS}" | jq -c '
[
(.[] | select(.profile == "emqx") | . + {
builder: "5.0-34",
builder: "5.0-35",
otp: "25.1.2-3",
elixir: "1.13.4"
}),
(.[] | select(.profile == "emqx-enterprise") | . + {
builder: "5.0-34",
builder: "5.0-35",
otp: ["24.3.4.2-3", "25.1.2-3"][],
elixir: "1.13.4"
})
@ -109,7 +109,9 @@ jobs:
- uses: actions/cache@v3
with:
path: "source/emqx_dialyzer_${{ matrix.otp }}_plt"
key: rebar3-dialyzer-plt-${{ matrix.profile }}-${{ matrix.otp }}
key: rebar3-dialyzer-plt-${{ matrix.profile }}-${{ matrix.otp }}-${{ hashFiles('source/rebar.*', 'source/apps/*/rebar.*', 'source/lib-ee/*/rebar.*') }}
restore-keys: |
rebar3-dialyzer-plt-${{ matrix.profile }}-${{ matrix.otp }}-
- name: run static checks
env:
PROFILE: ${{ matrix.profile }}
@ -256,7 +258,7 @@ jobs:
- ct
- ct_docker
runs-on: ubuntu-22.04
container: "ghcr.io/emqx/emqx-builder/5.0-34:1.13.4-24.3.4.2-3-ubuntu22.04"
container: "ghcr.io/emqx/emqx-builder/5.0-35:1.13.4-24.3.4.2-3-ubuntu22.04"
steps:
- uses: AutoModality/action-clean@v1
- uses: actions/download-artifact@v3

View File

@ -30,16 +30,32 @@
)
).
-define(assertInclude(PATTERN, LIST),
?assert(
lists:any(
fun(X__Elem_) ->
case X__Elem_ of
PATTERN -> true;
_ -> false
end
end,
LIST
)
)
-define(drainMailbox(),
(fun F__Flush_() ->
receive
X__Msg_ -> [X__Msg_ | F__Flush_()]
after 0 -> []
end
end)()
).
-define(assertReceive(PATTERN),
?assertReceive(PATTERN, 1000)
).
-define(assertReceive(PATTERN, TIMEOUT),
(fun() ->
receive
X__V = PATTERN -> X__V
after TIMEOUT ->
erlang:error(
{assertReceive, [
{module, ?MODULE},
{line, ?LINE},
{expression, (??PATTERN)},
{mailbox, ?drainMailbox()}
]}
)
end
end)()
).

View File

@ -32,7 +32,7 @@
%% `apps/emqx/src/bpapi/README.md'
%% Community edition
-define(EMQX_RELEASE_CE, "5.0.25").
-define(EMQX_RELEASE_CE, "5.0.26-alpha.1").
%% Enterprise edition
-define(EMQX_RELEASE_EE, "5.0.4").

View File

@ -45,6 +45,5 @@
{emqx_rule_engine,1}.
{emqx_shared_sub,1}.
{emqx_slow_subs,1}.
{emqx_statsd,1}.
{emqx_telemetry,1}.
{emqx_topic_metrics,1}.

View File

@ -27,9 +27,9 @@
{gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.1"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.2"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.4"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.7"}}},
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}},
{pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
{recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}},

View File

@ -112,8 +112,8 @@ update_log_handler({Action, {handler, Id, Mod, Conf}}) ->
end,
ok.
id_for_log(console) -> "log.console_handler";
id_for_log(Other) -> "log.file_handlers." ++ atom_to_list(Other).
id_for_log(console) -> "log.console";
id_for_log(Other) -> "log.file." ++ atom_to_list(Other).
atom(Id) when is_binary(Id) -> binary_to_atom(Id, utf8);
atom(Id) when is_atom(Id) -> Id.
@ -126,12 +126,12 @@ tr_handlers(Conf) ->
%% For the default logger that outputs to console
tr_console_handler(Conf) ->
case conf_get("log.console_handler.enable", Conf) of
case conf_get("log.console.enable", Conf) of
true ->
ConsoleConf = conf_get("log.console_handler", Conf),
ConsoleConf = conf_get("log.console", Conf),
[
{handler, console, logger_std_h, #{
level => conf_get("log.console_handler.level", Conf),
level => conf_get("log.console.level", Conf),
config => (log_handler_conf(ConsoleConf))#{type => standard_io},
formatter => log_formatter(ConsoleConf),
filters => log_filter(ConsoleConf)
@ -150,14 +150,10 @@ tr_file_handler({HandlerName, SubConf}) ->
{handler, atom(HandlerName), logger_disk_log_h, #{
level => conf_get("level", SubConf),
config => (log_handler_conf(SubConf))#{
type =>
case conf_get("rotation.enable", SubConf) of
true -> wrap;
_ -> halt
end,
file => conf_get("file", SubConf),
max_no_files => conf_get("rotation.count", SubConf),
max_no_bytes => conf_get("max_size", SubConf)
type => wrap,
file => conf_get("to", SubConf),
max_no_files => conf_get("rotation_count", SubConf),
max_no_bytes => conf_get("rotation_size", SubConf)
},
formatter => log_formatter(SubConf),
filters => log_filter(SubConf),
@ -165,14 +161,11 @@ tr_file_handler({HandlerName, SubConf}) ->
}}.
logger_file_handlers(Conf) ->
Handlers = maps:to_list(conf_get("log.file_handlers", Conf, #{})),
lists:filter(
fun({_Name, Opts}) ->
B = conf_get("enable", Opts),
true = is_boolean(B),
B
fun({_Name, Handler}) ->
conf_get("enable", Handler, false)
end,
Handlers
maps:to_list(conf_get("log.file", Conf, #{}))
).
conf_get(Key, Conf) -> emqx_schema:conf_get(Key, Conf).
@ -237,12 +230,8 @@ log_filter(Conf) ->
end.
tr_level(Conf) ->
ConsoleLevel = conf_get("log.console_handler.level", Conf, undefined),
FileLevels = [
conf_get("level", SubConf)
|| {_, SubConf} <-
logger_file_handlers(Conf)
],
ConsoleLevel = conf_get("log.console.level", Conf, undefined),
FileLevels = [conf_get("level", SubConf) || {_, SubConf} <- logger_file_handlers(Conf)],
case FileLevels ++ [ConsoleLevel || ConsoleLevel =/= undefined] of
%% warning is the default level we should use
[] -> warning;

View File

@ -3,7 +3,7 @@
{id, "emqx"},
{description, "EMQX Core"},
% strict semver, bump manually!
{vsn, "5.0.26"},
{vsn, "5.0.27"},
{modules, []},
{registered, []},
{applications, [

View File

@ -184,11 +184,18 @@ run_fold_hook(HookPoint, Args, Acc) ->
-spec get_config(emqx_utils_maps:config_key_path()) -> term().
get_config(KeyPath) ->
emqx_config:get(KeyPath).
KeyPath1 = emqx_config:ensure_atom_conf_path(KeyPath, {raise_error, config_not_found}),
emqx_config:get(KeyPath1).
-spec get_config(emqx_utils_maps:config_key_path(), term()) -> term().
get_config(KeyPath, Default) ->
emqx_config:get(KeyPath, Default).
try
KeyPath1 = emqx_config:ensure_atom_conf_path(KeyPath, {raise_error, config_not_found}),
emqx_config:get(KeyPath1, Default)
catch
error:config_not_found ->
Default
end.
-spec get_raw_config(emqx_utils_maps:config_key_path()) -> term().
get_raw_config(KeyPath) ->

View File

@ -29,9 +29,13 @@
authn_type/1
]).
-ifdef(TEST).
-export([convert_certs/2, convert_certs/3, clear_certs/2]).
-endif.
%% Used in emqx_gateway
-export([
certs_dir/2,
convert_certs/2,
convert_certs/3,
clear_certs/2
]).
-export_type([config/0]).

View File

@ -256,9 +256,7 @@ init(
),
{NClientInfo, NConnInfo} = take_ws_cookie(ClientInfo, ConnInfo),
#channel{
%% We remove the peercert because it duplicates to what's stored in the socket,
%% Saving a copy here causes unnecessary wast of memory (about 1KB per connection).
conninfo = maps:put(peercert, undefined, NConnInfo),
conninfo = NConnInfo,
clientinfo = NClientInfo,
topic_aliases = #{
inbound => #{},
@ -1217,7 +1215,7 @@ handle_call(
}
) ->
ClientId = info(clientid, Channel),
NKeepalive = emqx_keepalive:set(interval, Interval * 1000, KeepAlive),
NKeepalive = emqx_keepalive:update(timer:seconds(Interval), KeepAlive),
NConnInfo = maps:put(keepalive, Interval, ConnInfo),
NChannel = Channel#channel{keepalive = NKeepalive, conninfo = NConnInfo},
SockInfo = maps:get(sockinfo, emqx_cm:get_chan_info(ClientId), #{}),
@ -2004,10 +2002,21 @@ ensure_connected(
NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
ok = run_hooks('client.connected', [ClientInfo, NConnInfo]),
Channel#channel{
conninfo = NConnInfo,
conninfo = trim_conninfo(NConnInfo),
conn_state = connected
}.
trim_conninfo(ConnInfo) ->
maps:without(
[
%% NOTE
%% We remove the peercert because it duplicates what's stored in the socket,
%% otherwise it wastes about 1KB per connection.
peercert
],
ConnInfo
).
%%--------------------------------------------------------------------
%% Init Alias Maximum
@ -2040,9 +2049,9 @@ ensure_keepalive_timer(0, Channel) ->
ensure_keepalive_timer(disabled, Channel) ->
Channel;
ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone}}) ->
Backoff = get_mqtt_conf(Zone, keepalive_backoff),
RecvOct = emqx_pd:get_counter(incoming_bytes),
Keepalive = emqx_keepalive:init(RecvOct, round(timer:seconds(Interval) * Backoff)),
Multiplier = get_mqtt_conf(Zone, keepalive_multiplier),
RecvCnt = emqx_pd:get_counter(recv_pkt),
Keepalive = emqx_keepalive:init(RecvCnt, round(timer:seconds(Interval) * Multiplier)),
ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}).
clear_keepalive(Channel = #channel{timers = Timers}) ->
@ -2151,7 +2160,8 @@ publish_will_msg(
ok;
false ->
NMsg = emqx_mountpoint:mount(MountPoint, Msg),
_ = emqx_broker:publish(NMsg),
NMsg2 = NMsg#message{timestamp = erlang:system_time(millisecond)},
_ = emqx_broker:publish(NMsg2),
ok
end.

View File

@ -88,6 +88,8 @@
remove_handlers/0
]).
-export([ensure_atom_conf_path/2]).
-ifdef(TEST).
-export([erase_all/0]).
-endif.
@ -113,7 +115,8 @@
update_cmd/0,
update_args/0,
update_error/0,
update_result/0
update_result/0,
runtime_config_key_path/0
]).
-type update_request() :: term().
@ -144,6 +147,8 @@
-type config() :: #{atom() => term()} | list() | undefined.
-type app_envs() :: [proplists:property()].
-type runtime_config_key_path() :: [atom()].
%% @doc For the given path, get root value enclosed in a single-key map.
-spec get_root(emqx_utils_maps:config_key_path()) -> map().
get_root([RootName | _]) ->
@ -156,25 +161,21 @@ get_root_raw([RootName | _]) ->
%% @doc Get a config value for the given path.
%% The path should at least include root config name.
-spec get(emqx_utils_maps:config_key_path()) -> term().
-spec get(runtime_config_key_path()) -> term().
get(KeyPath) -> do_get(?CONF, KeyPath).
-spec get(emqx_utils_maps:config_key_path(), term()) -> term().
-spec get(runtime_config_key_path(), term()) -> term().
get(KeyPath, Default) -> do_get(?CONF, KeyPath, Default).
-spec find(emqx_utils_maps:config_key_path()) ->
-spec find(runtime_config_key_path()) ->
{ok, term()} | {not_found, emqx_utils_maps:config_key_path(), term()}.
find([]) ->
case do_get(?CONF, [], ?CONFIG_NOT_FOUND_MAGIC) of
?CONFIG_NOT_FOUND_MAGIC -> {not_found, []};
Res -> {ok, Res}
end;
find(KeyPath) ->
atom_conf_path(
KeyPath,
fun(AtomKeyPath) -> emqx_utils_maps:deep_find(AtomKeyPath, get_root(KeyPath)) end,
{return, {not_found, KeyPath}}
).
find(AtomKeyPath) ->
emqx_utils_maps:deep_find(AtomKeyPath, get_root(AtomKeyPath)).
-spec find_raw(emqx_utils_maps:config_key_path()) ->
{ok, term()} | {not_found, emqx_utils_maps:config_key_path(), term()}.
@ -712,21 +713,14 @@ do_put(Type, Putter, [RootName | KeyPath], DeepValue) ->
NewValue = do_deep_put(Type, Putter, KeyPath, OldValue, DeepValue),
persistent_term:put(?PERSIS_KEY(Type, RootName), NewValue).
do_deep_get(?CONF, KeyPath, Map, Default) ->
atom_conf_path(
KeyPath,
fun(AtomKeyPath) -> emqx_utils_maps:deep_get(AtomKeyPath, Map, Default) end,
{return, Default}
);
do_deep_get(?CONF, AtomKeyPath, Map, Default) ->
emqx_utils_maps:deep_get(AtomKeyPath, Map, Default);
do_deep_get(?RAW_CONF, KeyPath, Map, Default) ->
emqx_utils_maps:deep_get([bin(Key) || Key <- KeyPath], Map, Default).
do_deep_put(?CONF, Putter, KeyPath, Map, Value) ->
atom_conf_path(
KeyPath,
fun(AtomKeyPath) -> Putter(AtomKeyPath, Map, Value) end,
{raise_error, {not_found, KeyPath}}
);
AtomKeyPath = ensure_atom_conf_path(KeyPath, {raise_error, {not_found, KeyPath}}),
Putter(AtomKeyPath, Map, Value);
do_deep_put(?RAW_CONF, Putter, KeyPath, Map, Value) ->
Putter([bin(Key) || Key <- KeyPath], Map, Value).
@ -773,15 +767,24 @@ conf_key(?CONF, RootName) ->
conf_key(?RAW_CONF, RootName) ->
bin(RootName).
atom_conf_path(Path, ExpFun, OnFail) ->
try [atom(Key) || Key <- Path] of
AtomKeyPath -> ExpFun(AtomKeyPath)
ensure_atom_conf_path(Path, OnFail) ->
case lists:all(fun erlang:is_atom/1, Path) of
true ->
%% Do not try to build new atom PATH if it already is.
Path;
_ ->
to_atom_conf_path(Path, OnFail)
end.
to_atom_conf_path(Path, OnFail) ->
try
[atom(Key) || Key <- Path]
catch
error:badarg ->
case OnFail of
{return, Val} ->
Val;
{raise_error, Err} ->
error(Err)
error(Err);
{return, V} ->
V
end
end.

View File

@ -22,7 +22,7 @@
info/1,
info/2,
check/2,
set/3
update/2
]).
-elvis([{elvis_style, no_if_expression, disable}]).
@ -31,66 +31,16 @@
-record(keepalive, {
interval :: pos_integer(),
statval :: non_neg_integer(),
repeat :: non_neg_integer()
statval :: non_neg_integer()
}).
-opaque keepalive() :: #keepalive{}.
-define(MAX_INTERVAL, 65535000).
%% @doc Init keepalive.
-spec init(Interval :: non_neg_integer()) -> keepalive().
init(Interval) -> init(0, Interval).
%% @doc Init keepalive.
-spec init(StatVal :: non_neg_integer(), Interval :: non_neg_integer()) -> keepalive().
init(StatVal, Interval) when Interval > 0 ->
#keepalive{
interval = Interval,
statval = StatVal,
repeat = 0
}.
%% @doc Get Info of the keepalive.
-spec info(keepalive()) -> emqx_types:infos().
info(#keepalive{
interval = Interval,
statval = StatVal,
repeat = Repeat
}) ->
#{
interval => Interval,
statval => StatVal,
repeat => Repeat
}.
-spec info(interval | statval | repeat, keepalive()) ->
non_neg_integer().
info(interval, #keepalive{interval = Interval}) ->
Interval;
info(statval, #keepalive{statval = StatVal}) ->
StatVal;
info(repeat, #keepalive{repeat = Repeat}) ->
Repeat.
%% @doc Check keepalive.
-spec check(non_neg_integer(), keepalive()) ->
{ok, keepalive()} | {error, timeout}.
check(
NewVal,
KeepAlive = #keepalive{
statval = OldVal,
repeat = Repeat
}
) ->
if
NewVal =/= OldVal ->
{ok, KeepAlive#keepalive{statval = NewVal, repeat = 0}};
Repeat < 1 ->
{ok, KeepAlive#keepalive{repeat = Repeat + 1}};
true ->
{error, timeout}
end.
%% from mqtt-v3.1.1 specific
%% A Keep Alive value of zero (0) has the effect of turning off the keep alive mechanism.
%% This means that, in this case, the Server is not required
@ -102,7 +52,43 @@ check(
%%The actual value of the Keep Alive is application specific;
%% typically this is a few minutes.
%% The maximum value is (65535s) 18 hours 12 minutes and 15 seconds.
%% @doc Update keepalive's interval
-spec set(interval, non_neg_integer(), keepalive()) -> keepalive().
set(interval, Interval, KeepAlive) when Interval >= 0 andalso Interval =< 65535000 ->
KeepAlive#keepalive{interval = Interval}.
%% @doc Init keepalive.
-spec init(StatVal :: non_neg_integer(), Interval :: non_neg_integer()) -> keepalive() | undefined.
init(StatVal, Interval) when Interval > 0 andalso Interval =< ?MAX_INTERVAL ->
#keepalive{interval = Interval, statval = StatVal};
init(_, 0) ->
undefined;
init(StatVal, Interval) when Interval > ?MAX_INTERVAL -> init(StatVal, ?MAX_INTERVAL).
%% @doc Get Info of the keepalive.
-spec info(keepalive()) -> emqx_types:infos().
info(#keepalive{
interval = Interval,
statval = StatVal
}) ->
#{
interval => Interval,
statval => StatVal
}.
-spec info(interval | statval, keepalive()) ->
non_neg_integer().
info(interval, #keepalive{interval = Interval}) ->
Interval;
info(statval, #keepalive{statval = StatVal}) ->
StatVal;
info(interval, undefined) ->
0.
%% @doc Check keepalive.
-spec check(non_neg_integer(), keepalive()) ->
{ok, keepalive()} | {error, timeout}.
check(Val, #keepalive{statval = Val}) -> {error, timeout};
check(Val, KeepAlive) -> {ok, KeepAlive#keepalive{statval = Val}}.
%% @doc Update keepalive.
%% The statval of the previous keepalive will be used,
%% and normal checks will begin from the next cycle.
-spec update(non_neg_integer(), keepalive() | undefined) -> keepalive() | undefined.
update(Interval, undefined) -> init(0, Interval);
update(Interval, #keepalive{statval = StatVal}) -> init(StatVal, Interval).

View File

@ -277,9 +277,8 @@ restart_listener(Type, ListenerName, Conf) ->
restart_listener(Type, ListenerName, Conf, Conf).
restart_listener(Type, ListenerName, OldConf, NewConf) ->
case do_stop_listener(Type, ListenerName, OldConf) of
case stop_listener(Type, ListenerName, OldConf) of
ok -> start_listener(Type, ListenerName, NewConf);
{error, not_found} -> start_listener(Type, ListenerName, NewConf);
{error, Reason} -> {error, Reason}
end.
@ -296,42 +295,63 @@ stop_listener(ListenerId) ->
apply_on_listener(ListenerId, fun stop_listener/3).
stop_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
case do_stop_listener(Type, ListenerName, Conf) of
Id = listener_id(Type, ListenerName),
ok = del_limiter_bucket(Id, Conf),
case do_stop_listener(Type, Id, Conf) of
ok ->
console_print(
"Listener ~ts on ~ts stopped.~n",
[listener_id(Type, ListenerName), format_bind(Bind)]
[Id, format_bind(Bind)]
),
ok;
{error, not_found} ->
?ELOG(
"Failed to stop listener ~ts on ~ts: ~0p~n",
[listener_id(Type, ListenerName), format_bind(Bind), already_stopped]
),
ok;
{error, Reason} ->
?ELOG(
"Failed to stop listener ~ts on ~ts: ~0p~n",
[listener_id(Type, ListenerName), format_bind(Bind), Reason]
[Id, format_bind(Bind), Reason]
),
{error, Reason}
end.
-spec do_stop_listener(atom(), atom(), map()) -> ok | {error, term()}.
do_stop_listener(Type, ListenerName, #{bind := ListenOn} = Conf) when Type == tcp; Type == ssl ->
Id = listener_id(Type, ListenerName),
del_limiter_bucket(Id, Conf),
do_stop_listener(Type, Id, #{bind := ListenOn}) when Type == tcp; Type == ssl ->
esockd:close(Id, ListenOn);
do_stop_listener(Type, ListenerName, Conf) when Type == ws; Type == wss ->
Id = listener_id(Type, ListenerName),
del_limiter_bucket(Id, Conf),
cowboy:stop_listener(Id);
do_stop_listener(quic, ListenerName, Conf) ->
Id = listener_id(quic, ListenerName),
del_limiter_bucket(Id, Conf),
do_stop_listener(Type, Id, #{bind := ListenOn}) when Type == ws; Type == wss ->
case cowboy:stop_listener(Id) of
ok ->
wait_listener_stopped(ListenOn);
Error ->
Error
end;
do_stop_listener(quic, Id, _Conf) ->
quicer:stop_listener(Id).
wait_listener_stopped(ListenOn) ->
% NOTE
% `cowboy:stop_listener/1` will not close the listening socket explicitly,
% it will be closed by the runtime system **only after** the process exits.
Endpoint = maps:from_list(ip_port(ListenOn)),
case
gen_tcp:connect(
maps:get(ip, Endpoint, loopback),
maps:get(port, Endpoint),
[{active, false}]
)
of
{error, _EConnrefused} ->
%% NOTE
%% We should get `econnrefused` here because acceptors are already dead
%% but don't want to crash if not, because this doesn't make any difference.
ok;
{ok, Socket} ->
%% NOTE
%% Tiny chance to get a connected socket here, when some other process
%% concurrently binds to the same port.
gen_tcp:close(Socket)
end.
-ifndef(TEST).
console_print(Fmt, Args) -> ?ULOG(Fmt, Args).
-else.

View File

@ -129,7 +129,7 @@ init(Opts = #{max_len := MaxLen0, store_qos0 := Qos0}) ->
#mqueue{
max_len = MaxLen,
store_qos0 = Qos0,
p_table = get_opt(priorities, Opts, ?NO_PRIORITY_TABLE),
p_table = p_table(get_opt(priorities, Opts, ?NO_PRIORITY_TABLE)),
default_p = get_priority_opt(Opts),
shift_opts = get_shift_opt(Opts)
}.
@ -295,3 +295,18 @@ get_shift_opt(Opts) ->
multiplier = Mult,
base = Base
}.
%% topic from mqtt.mqueue_priorities(map()) is atom.
p_table(PTab = #{}) ->
maps:fold(
fun
(Topic, Priority, Acc) when is_atom(Topic) ->
maps:put(atom_to_binary(Topic), Priority, Acc);
(Topic, Priority, Acc) when is_binary(Topic) ->
maps:put(Topic, Priority, Acc)
end,
#{},
PTab
);
p_table(PTab) ->
PTab.

View File

@ -78,6 +78,7 @@
validate_heap_size/1,
user_lookup_fun_tr/2,
validate_alarm_actions/1,
validate_keepalive_multiplier/1,
non_empty_string/1,
validations/0,
naive_env_interpolation/1
@ -110,7 +111,8 @@
servers_validator/2,
servers_sc/2,
convert_servers/1,
convert_servers/2
convert_servers/2,
mqtt_converter/2
]).
%% tombstone types
@ -151,6 +153,8 @@
-define(BIT(Bits), (1 bsl (Bits))).
-define(MAX_UINT(Bits), (?BIT(Bits) - 1)).
-define(DEFAULT_MULTIPLIER, 1.5).
-define(DEFAULT_BACKOFF, 0.75).
namespace() -> broker.
@ -173,6 +177,7 @@ roots(high) ->
ref("mqtt"),
#{
desc => ?DESC(mqtt),
converter => fun ?MODULE:mqtt_converter/2,
importance => ?IMPORTANCE_MEDIUM
}
)},
@ -523,8 +528,19 @@ fields("mqtt") ->
sc(
number(),
#{
default => 0.75,
desc => ?DESC(mqtt_keepalive_backoff)
default => ?DEFAULT_BACKOFF,
%% Must add required => false, zone schema has no default.
required => false,
importance => ?IMPORTANCE_HIDDEN
}
)},
{"keepalive_multiplier",
sc(
number(),
#{
default => ?DEFAULT_MULTIPLIER,
validator => fun ?MODULE:validate_keepalive_multiplier/1,
desc => ?DESC(mqtt_keepalive_multiplier)
}
)},
{"max_subscriptions",
@ -593,7 +609,7 @@ fields("mqtt") ->
)},
{"mqueue_priorities",
sc(
hoconsc:union([map(), disabled]),
hoconsc:union([disabled, map()]),
#{
default => disabled,
desc => ?DESC(mqtt_mqueue_priorities)
@ -641,7 +657,7 @@ fields("mqtt") ->
)}
];
fields("zone") ->
emqx_zone_schema:zone();
emqx_zone_schema:zones_without_default();
fields("flapping_detect") ->
[
{"enable",
@ -2291,6 +2307,17 @@ common_ssl_opts_schema(Defaults, Type) ->
desc => ?DESC(common_ssl_opts_schema_secure_renegotiate)
}
)},
{"log_level",
sc(
hoconsc:enum([
emergency, alert, critical, error, warning, notice, info, debug, none, all
]),
#{
default => notice,
desc => ?DESC(common_ssl_opts_schema_log_level),
importance => ?IMPORTANCE_LOW
}
)},
{"hibernate_after",
sc(
@ -2735,6 +2762,13 @@ validate_heap_size(Siz) when is_integer(Siz) ->
validate_heap_size(_SizStr) ->
{error, invalid_heap_size}.
validate_keepalive_multiplier(Multiplier) when
is_number(Multiplier) andalso Multiplier >= 1.0 andalso Multiplier =< 65535.0
->
ok;
validate_keepalive_multiplier(_Multiplier) ->
{error, #{reason => keepalive_multiplier_out_of_range, min => 1, max => 65535}}.
validate_alarm_actions(Actions) ->
UnSupported = lists:filter(
fun(Action) -> Action =/= log andalso Action =/= publish end, Actions
@ -3381,3 +3415,20 @@ ensure_default_listener(Map, ListenerType) ->
cert_file(_File, client) -> undefined;
cert_file(File, server) -> iolist_to_binary(filename:join(["${EMQX_ETC_DIR}", "certs", File])).
mqtt_converter(#{<<"keepalive_multiplier">> := Multi} = Mqtt, _Opts) ->
case round(Multi * 100) =:= round(?DEFAULT_MULTIPLIER * 100) of
false ->
%% Multiplier is provided, and it's not default value
Mqtt;
true ->
%% Multiplier is default value, fallback to use Backoff value
%% Backoff default value was half of Multiplier default value
%% so there is no need to compare Backoff with its default.
Backoff = maps:get(<<"keepalive_backoff">>, Mqtt, ?DEFAULT_BACKOFF),
Mqtt#{<<"keepalive_multiplier">> => Backoff * 2}
end;
mqtt_converter(#{<<"keepalive_backoff">> := Backoff} = Mqtt, _Opts) ->
Mqtt#{<<"keepalive_multiplier">> => Backoff * 2};
mqtt_converter(Mqtt, _Opts) ->
Mqtt.

View File

@ -21,6 +21,10 @@
%% API:
-export([wrap/1, unwrap/1]).
-export_type([t/1]).
-opaque t(T) :: T | fun(() -> t(T)).
%%================================================================================
%% API funcions
%%================================================================================

View File

@ -291,16 +291,16 @@ stats(Session) -> info(?STATS_KEYS, Session).
ignore_local(ClientInfo, Delivers, Subscriber, Session) ->
Subs = info(subscriptions, Session),
lists:dropwhile(
lists:filter(
fun({deliver, Topic, #message{from = Publisher} = Msg}) ->
case maps:find(Topic, Subs) of
{ok, #{nl := 1}} when Subscriber =:= Publisher ->
ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, no_local]),
ok = emqx_metrics:inc('delivery.dropped'),
ok = emqx_metrics:inc('delivery.dropped.no_local'),
true;
false;
_ ->
false
true
end
end,
Delivers

View File

@ -158,9 +158,18 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
-spec strategy(emqx_topic:group()) -> strategy().
strategy(Group) ->
case emqx:get_config([broker, shared_subscription_group, Group, strategy], undefined) of
undefined -> emqx:get_config([broker, shared_subscription_strategy]);
Strategy -> Strategy
try
emqx:get_config([
broker,
shared_subscription_group,
binary_to_existing_atom(Group),
strategy
])
catch
error:{config_not_found, _} ->
get_default_shared_subscription_strategy();
error:badarg ->
get_default_shared_subscription_strategy()
end.
-spec ack_enabled() -> boolean().
@ -544,3 +553,6 @@ delete_route_if_needed({Group, Topic} = GroupTopic) ->
if_no_more_subscribers(GroupTopic, fun() ->
ok = emqx_router:do_delete_route(Topic, {Group, node()})
end).
get_default_shared_subscription_strategy() ->
emqx:get_config([broker, shared_subscription_strategy]).

View File

@ -144,7 +144,7 @@ list() ->
list(Enable) ->
ets:match_object(?TRACE, #?TRACE{enable = Enable, _ = '_'}).
-spec create([{Key :: binary(), Value :: binary()}] | #{atom() => binary()}) ->
-spec create([{Key :: binary(), Value :: any()}] | #{atom() => any()}) ->
{ok, #?TRACE{}}
| {error,
{duplicate_condition, iodata()}

View File

@ -131,7 +131,7 @@
socktype := socktype(),
sockname := peername(),
peername := peername(),
peercert := nossl | undefined | esockd_peercert:peercert(),
peercert => nossl | undefined | esockd_peercert:peercert(),
conn_mod := module(),
proto_name => binary(),
proto_ver => proto_ver(),

View File

@ -18,7 +18,8 @@
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-export([namespace/0, roots/0, fields/1, desc/1, zone/0, zone_without_hidden/0]).
-export([namespace/0, roots/0, fields/1, desc/1]).
-export([zones_without_default/0, global_zone_with_default/0]).
namespace() -> zone.
@ -35,7 +36,7 @@ roots() ->
"overload_protection"
].
zone() ->
zones_without_default() ->
Fields = roots(),
Hidden = hidden(),
lists:map(
@ -50,8 +51,8 @@ zone() ->
Fields
).
zone_without_hidden() ->
lists:map(fun(F) -> {F, ?HOCON(?R_REF(F), #{})} end, roots() -- hidden()).
global_zone_with_default() ->
lists:map(fun(F) -> {F, ?HOCON(?R_REF(emqx_schema, F), #{})} end, roots() -- hidden()).
hidden() ->
[
@ -69,9 +70,10 @@ fields(Name) ->
desc(Name) ->
emqx_schema:desc(Name).
%% no default values for zone settings
%% no default values for zone settings, don't required either.
no_default(Sc) ->
fun
(default) -> undefined;
(required) -> false;
(Other) -> hocon_schema:field_schema(Sc, Other)
end.

View File

@ -156,6 +156,19 @@ t_cluster_nodes(_) ->
?assertEqual(Expected, emqx:cluster_nodes(cores)),
?assertEqual([], emqx:cluster_nodes(stopped)).
t_get_config(_) ->
?assertEqual(false, emqx:get_config([overload_protection, enable])),
?assertEqual(false, emqx:get_config(["overload_protection", <<"enable">>])).
t_get_config_default_1(_) ->
?assertEqual(false, emqx:get_config([overload_protection, enable], undefined)),
?assertEqual(false, emqx:get_config(["overload_protection", <<"enable">>], undefined)).
t_get_config_default_2(_) ->
AtomPathRes = emqx:get_config([overload_protection, <<"_!no_@exist_">>], undefined),
NonAtomPathRes = emqx:get_config(["doesnotexist", <<"db_backend">>], undefined),
?assertEqual(undefined, NonAtomPathRes),
?assertEqual(undefined, AtomPathRes).
%%--------------------------------------------------------------------
%% Hook fun
%%--------------------------------------------------------------------

View File

@ -116,7 +116,6 @@ clientinfo(InitProps) ->
username => <<"username">>,
password => <<"passwd">>,
is_superuser => false,
peercert => undefined,
mountpoint => undefined
},
InitProps

View File

@ -51,6 +51,8 @@
"gen_rpc, recon, redbug, observer_cli, snabbkaffe, ekka, mria, amqp_client, rabbit_common"
).
-define(IGNORED_MODULES, "emqx_rpc").
-define(FORCE_DELETED_MODULES, [emqx_statsd, emqx_statsd_proto_v1]).
-define(FORCE_DELETED_APIS, [{emqx_statsd, 1}]).
%% List of known RPC backend modules:
-define(RPC_MODULES, "gen_rpc, erpc, rpc, emqx_rpc").
%% List of known functions also known to do RPC:
@ -127,11 +129,16 @@ check_api_immutability(#{release := Rel1, api := APIs1}, #{release := Rel2, api
Val ->
ok;
undefined ->
setnok(),
logger:error(
"API ~p v~p was removed in release ~p without being deprecated.",
[API, Version, Rel2]
);
case lists:member({API, Version}, ?FORCE_DELETED_APIS) of
true ->
ok;
false ->
setnok(),
logger:error(
"API ~p v~p was removed in release ~p without being deprecated.",
[API, Version, Rel2]
)
end;
_Val ->
setnok(),
logger:error(
@ -146,16 +153,24 @@ check_api_immutability(#{release := Rel1, api := APIs1}, #{release := Rel2, api
check_api_immutability(_, _) ->
ok.
filter_calls(Calls) ->
F = fun({{Mf, _, _}, {Mt, _, _}}) ->
(not lists:member(Mf, ?FORCE_DELETED_MODULES)) andalso
(not lists:member(Mt, ?FORCE_DELETED_MODULES))
end,
lists:filter(F, Calls).
%% Note: sets nok flag
-spec typecheck_apis(fulldump(), fulldump()) -> ok.
typecheck_apis(
#{release := CallerRelease, api := CallerAPIs, signatures := CallerSigs},
#{release := CalleeRelease, signatures := CalleeSigs}
) ->
AllCalls = lists:flatten([
AllCalls0 = lists:flatten([
[Calls, Casts]
|| #{calls := Calls, casts := Casts} <- maps:values(CallerAPIs)
]),
AllCalls = filter_calls(AllCalls0),
lists:foreach(
fun({From, To}) ->
Caller = get_param_types(CallerSigs, From),
@ -213,7 +228,7 @@ get_param_types(Signatures, {M, F, A}) ->
maps:from_list(lists:zip(A, AttrTypes));
_ ->
logger:critical("Call ~p:~p/~p is not found in PLT~n", [M, F, Arity]),
error(badkey)
error({badkey, {M, F, A}})
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

View File

@ -1211,7 +1211,6 @@ clientinfo(InitProps) ->
clientid => <<"clientid">>,
username => <<"username">>,
is_superuser => false,
peercert => undefined,
mountpoint => undefined
},
InitProps

View File

@ -22,6 +22,8 @@
-import(lists, [nth/2]).
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/emqx_hooks.hrl").
-include_lib("emqx/include/asserts.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
@ -75,7 +77,8 @@ groups() ->
t_username_as_clientid,
t_certcn_as_clientid_default_config_tls,
t_certcn_as_clientid_tlsv1_3,
t_certcn_as_clientid_tlsv1_2
t_certcn_as_clientid_tlsv1_2,
t_peercert_preserved_before_connected
]}
].
@ -379,6 +382,42 @@ t_certcn_as_clientid_tlsv1_3(_) ->
t_certcn_as_clientid_tlsv1_2(_) ->
tls_certcn_as_clientid('tlsv1.2').
t_peercert_preserved_before_connected(_) ->
ok = emqx_config:put_zone_conf(default, [mqtt], #{}),
ok = emqx_hooks:add(
'client.connect',
{?MODULE, on_hook, ['client.connect', self()]},
?HP_HIGHEST
),
ok = emqx_hooks:add(
'client.connected',
{?MODULE, on_hook, ['client.connected', self()]},
?HP_HIGHEST
),
ClientId = atom_to_binary(?FUNCTION_NAME),
SslConf = emqx_common_test_helpers:client_ssl_twoway(default),
{ok, Client} = emqtt:start_link([
{port, 8883},
{clientid, ClientId},
{ssl, true},
{ssl_opts, SslConf}
]),
{ok, _} = emqtt:connect(Client),
_ = ?assertReceive({'client.connect', #{peercert := PC}} when is_binary(PC)),
_ = ?assertReceive({'client.connected', #{peercert := PC}} when is_binary(PC)),
[ConnPid] = emqx_cm:lookup_channels(ClientId),
?assertMatch(
#{conninfo := ConnInfo} when not is_map_key(peercert, ConnInfo),
emqx_connection:info(ConnPid)
).
on_hook(ConnInfo, _, 'client.connect' = HP, Pid) ->
_ = Pid ! {HP, ConnInfo},
ok;
on_hook(_ClientInfo, ConnInfo, 'client.connected' = HP, Pid) ->
_ = Pid ! {HP, ConnInfo},
ok.
%%--------------------------------------------------------------------
%% Helper functions
%%--------------------------------------------------------------------
@ -421,10 +460,4 @@ tls_certcn_as_clientid(TLSVsn, RequiredTLSVsn) ->
{ok, _} = emqtt:connect(Client),
#{clientinfo := #{clientid := CN}} = emqx_cm:get_chan_info(CN),
confirm_tls_version(Client, RequiredTLSVsn),
%% verify that the peercert won't be stored in the conninfo
[ChannPid] = emqx_cm:lookup_channels(CN),
SysState = sys:get_state(ChannPid),
ChannelRecord = lists:keyfind(channel, 1, tuple_to_list(SysState)),
ConnInfo = lists:nth(2, tuple_to_list(ChannelRecord)),
?assertMatch(#{peercert := undefined}, ConnInfo),
emqtt:disconnect(Client).

View File

@ -232,11 +232,12 @@ render_and_load_app_config(App, Opts) ->
try
do_render_app_config(App, Schema, Conf, Opts)
catch
throw:skip ->
ok;
throw:E:St ->
%% turn throw into error
error({Conf, E, St})
end.
do_render_app_config(App, Schema, ConfigFile, Opts) ->
%% copy acl_conf must run before read_schema_configs
copy_acl_conf(),
@ -257,6 +258,7 @@ start_app(App, SpecAppConfig, Opts) ->
{ok, _} ->
ok = ensure_dashboard_listeners_started(App),
ok = wait_for_app_processes(App),
ok = perform_sanity_checks(App),
ok;
{error, Reason} ->
error({failed_to_start_app, App, Reason})
@ -270,6 +272,27 @@ wait_for_app_processes(emqx_conf) ->
wait_for_app_processes(_) ->
ok.
%% These are checks to detect inter-suite or inter-testcase flakiness
%% early. For example, one suite might forget one application running
%% and stop others, and then the `application:start/2' callback is
%% never called again for this application.
perform_sanity_checks(emqx_rule_engine) ->
ensure_config_handler(emqx_rule_engine, [rule_engine, rules]),
ok;
perform_sanity_checks(emqx_bridge) ->
ensure_config_handler(emqx_bridge, [bridges]),
ok;
perform_sanity_checks(_App) ->
ok.
ensure_config_handler(Module, ConfigPath) ->
#{handlers := Handlers} = sys:get_state(emqx_config_handler),
case emqx_utils_maps:deep_get(ConfigPath, Handlers, not_found) of
#{{mod} := Module} -> ok;
_NotFound -> error({config_handler_missing, ConfigPath, Module})
end,
ok.
app_conf_file(emqx_conf) -> "emqx.conf.all";
app_conf_file(App) -> atom_to_list(App) ++ ".conf".
@ -296,6 +319,7 @@ render_config_file(ConfigFile, Vars0) ->
Temp =
case file:read_file(ConfigFile) of
{ok, T} -> T;
{error, enoent} -> throw(skip);
{error, Reason} -> error({failed_to_read_config_template, ConfigFile, Reason})
end,
Vars = [{atom_to_list(N), iolist_to_binary(V)} || {N, V} <- maps:to_list(Vars0)],
@ -842,8 +866,8 @@ setup_node(Node, Opts) when is_map(Opts) ->
LoadSchema andalso
begin
%% to avoid sharing data between executions and/or
%% nodes. these variables might notbe in the
%% config file (e.g.: emqx_ee_conf_schema).
%% nodes. these variables might not be in the
%% config file (e.g.: emqx_enterprise_schema).
NodeDataDir = filename:join([
PrivDataDir,
node(),

View File

@ -676,7 +676,6 @@ channel(InitFields) ->
clientid => <<"clientid">>,
username => <<"username">>,
is_superuser => false,
peercert => undefined,
mountpoint => undefined
},
Conf = emqx_cm:get_session_confs(ClientInfo, #{

View File

@ -27,20 +27,14 @@ t_check(_) ->
Keepalive = emqx_keepalive:init(60),
?assertEqual(60, emqx_keepalive:info(interval, Keepalive)),
?assertEqual(0, emqx_keepalive:info(statval, Keepalive)),
?assertEqual(0, emqx_keepalive:info(repeat, Keepalive)),
Info = emqx_keepalive:info(Keepalive),
?assertEqual(
#{
interval => 60,
statval => 0,
repeat => 0
statval => 0
},
Info
),
{ok, Keepalive1} = emqx_keepalive:check(1, Keepalive),
?assertEqual(1, emqx_keepalive:info(statval, Keepalive1)),
?assertEqual(0, emqx_keepalive:info(repeat, Keepalive1)),
{ok, Keepalive2} = emqx_keepalive:check(1, Keepalive1),
?assertEqual(1, emqx_keepalive:info(statval, Keepalive2)),
?assertEqual(1, emqx_keepalive:info(repeat, Keepalive2)),
?assertEqual({error, timeout}, emqx_keepalive:check(1, Keepalive2)).
?assertEqual({error, timeout}, emqx_keepalive:check(1, Keepalive1)).

View File

@ -829,6 +829,42 @@ t_subscribe_no_local(Config) ->
?assertEqual(1, length(receive_messages(2))),
ok = emqtt:disconnect(Client1).
t_subscribe_no_local_mixed(Config) ->
ConnFun = ?config(conn_fun, Config),
Topic = nth(1, ?TOPICS),
{ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, _} = emqtt:ConnFun(Client1),
{ok, Client2} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, _} = emqtt:ConnFun(Client2),
%% Given tow clients and client1 subscribe to topic with 'no local' set to true
{ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{Topic, [{nl, true}, {qos, 2}]}]),
%% When mixed publish traffic are sent from both clients (Client1 sent 6 and Client2 sent 2)
CB = {fun emqtt:sync_publish_result/3, [self(), async_res]},
ok = emqtt:publish_async(Client1, Topic, <<"t_subscribe_no_local_mixed1">>, 0, CB),
ok = emqtt:publish_async(Client2, Topic, <<"t_subscribe_no_local_mixed2">>, 0, CB),
ok = emqtt:publish_async(Client1, Topic, <<"t_subscribe_no_local_mixed3">>, 0, CB),
ok = emqtt:publish_async(Client1, Topic, <<"t_subscribe_no_local_mixed4">>, 0, CB),
ok = emqtt:publish_async(Client1, Topic, <<"t_subscribe_no_local_mixed5">>, 0, CB),
ok = emqtt:publish_async(Client2, Topic, <<"t_subscribe_no_local_mixed6">>, 0, CB),
ok = emqtt:publish_async(Client1, Topic, <<"t_subscribe_no_local_mixed7">>, 0, CB),
ok = emqtt:publish_async(Client1, Topic, <<"t_subscribe_no_local_mixed8">>, 0, CB),
[
receive
{async_res, Res} -> ?assertEqual(ok, Res)
end
|| _ <- lists:seq(1, 8)
],
%% Then only two messages from clients 2 are received
PubRecvd = receive_messages(9),
ct:pal("~p", [PubRecvd]),
?assertEqual(2, length(PubRecvd)),
ok = emqtt:disconnect(Client1),
ok = emqtt:disconnect(Client2).
t_subscribe_actions(Config) ->
ConnFun = ?config(conn_fun, Config),
Topic = nth(1, ?TOPICS),

View File

@ -165,6 +165,7 @@ init_per_testcase(_TestCase, Config) ->
{ok, {{"HTTP/1.0", 200, 'OK'}, [], <<"ocsp response">>}}
end
),
snabbkaffe:start_trace(),
_Heir = spawn_dummy_heir(),
{ok, CachePid} = emqx_ocsp_cache:start_link(),
DataDir = ?config(data_dir, Config),
@ -187,7 +188,6 @@ init_per_testcase(_TestCase, Config) ->
ConfBin = emqx_utils_maps:binary_key_map(Conf),
hocon_tconf:check_plain(emqx_schema, ConfBin, #{required => false, atom_keys => false}),
emqx_config:put_listener_conf(Type, Name, [], ListenerOpts),
snabbkaffe:start_trace(),
[
{cache_pid, CachePid}
| Config
@ -231,12 +231,19 @@ end_per_testcase(_TestCase, Config) ->
%% In some tests, we don't start the full supervision tree, so we need
%% this dummy process.
spawn_dummy_heir() ->
spawn_link(fun() ->
true = register(emqx_kernel_sup, self()),
receive
stop -> ok
end
end).
{_, {ok, _}} =
?wait_async_action(
spawn_link(fun() ->
true = register(emqx_kernel_sup, self()),
?tp(heir_name_registered, #{}),
receive
stop -> ok
end
end),
#{?snk_kind := heir_name_registered},
1_000
),
ok.
does_module_exist(Mod) ->
case erlang:module_loaded(Mod) of

View File

@ -655,6 +655,43 @@ password_converter_test() ->
?assertThrow("must_quote", emqx_schema:password_converter(foobar, #{})),
ok.
-define(MQTT(B, M), #{<<"keepalive_backoff">> => B, <<"keepalive_multiplier">> => M}).
keepalive_convert_test() ->
?assertEqual(undefined, emqx_schema:mqtt_converter(undefined, #{})),
DefaultBackoff = 0.75,
DefaultMultiplier = 1.5,
Default = ?MQTT(DefaultBackoff, DefaultMultiplier),
?assertEqual(Default, emqx_schema:mqtt_converter(Default, #{})),
?assertEqual(?MQTT(1.5, 3), emqx_schema:mqtt_converter(?MQTT(1.5, 3), #{})),
?assertEqual(
?MQTT(DefaultBackoff, 3), emqx_schema:mqtt_converter(?MQTT(DefaultBackoff, 3), #{})
),
?assertEqual(?MQTT(1, 2), emqx_schema:mqtt_converter(?MQTT(1, DefaultMultiplier), #{})),
?assertEqual(?MQTT(1.5, 3), emqx_schema:mqtt_converter(?MQTT(1.5, 3), #{})),
?assertEqual(#{}, emqx_schema:mqtt_converter(#{}, #{})),
?assertEqual(
#{<<"keepalive_backoff">> => 1.5, <<"keepalive_multiplier">> => 3.0},
emqx_schema:mqtt_converter(#{<<"keepalive_backoff">> => 1.5}, #{})
),
?assertEqual(
#{<<"keepalive_multiplier">> => 5.0},
emqx_schema:mqtt_converter(#{<<"keepalive_multiplier">> => 5.0}, #{})
),
?assertEqual(
#{
<<"keepalive_backoff">> => DefaultBackoff,
<<"keepalive_multiplier">> => DefaultMultiplier
},
emqx_schema:mqtt_converter(#{<<"keepalive_backoff">> => DefaultBackoff}, #{})
),
?assertEqual(
#{<<"keepalive_multiplier">> => DefaultMultiplier},
emqx_schema:mqtt_converter(#{<<"keepalive_multiplier">> => DefaultMultiplier}, #{})
),
ok.
url_type_test_() ->
[
?_assertEqual(

View File

@ -33,17 +33,6 @@
]
).
-define(STATS_KEYS, [
recv_oct,
recv_cnt,
send_oct,
send_cnt,
recv_pkt,
recv_msg,
send_pkt,
send_msg
]).
-define(ws_conn, emqx_ws_connection).
all() -> emqx_common_test_helpers:all(?MODULE).
@ -618,7 +607,6 @@ channel(InitFields) ->
clientid => <<"clientid">>,
username => <<"username">>,
is_superuser => false,
peercert => undefined,
mountpoint => undefined
},
Conf = emqx_cm:get_session_confs(ClientInfo, #{

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_authn, [
{description, "EMQX Authentication"},
{vsn, "0.1.19"},
{vsn, "0.1.20"},
{modules, []},
{registered, [emqx_authn_sup, emqx_authn_registry]},
{applications, [kernel, stdlib, emqx_resource, emqx_connector, ehttpc, epgsql, mysql, jose]},

View File

@ -805,7 +805,11 @@ with_listener(ListenerID, Fun) ->
find_listener(ListenerID) ->
case binary:split(ListenerID, <<":">>) of
[BType, BName] ->
case emqx_config:find([listeners, BType, BName]) of
case
emqx_config:find([
listeners, binary_to_existing_atom(BType), binary_to_existing_atom(BName)
])
of
{ok, _} ->
{ok, {BType, BName}};
{not_found, _, _} ->

View File

@ -100,7 +100,6 @@ common_fields() ->
maps:to_list(
maps:without(
[
base_url,
pool_type
],
maps:from_list(emqx_connector_http:fields(config))

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_authz, [
{description, "An OTP application"},
{vsn, "0.1.20"},
{vsn, "0.1.21"},
{registered, []},
{mod, {emqx_authz_app, []}},
{applications, [

View File

@ -116,7 +116,6 @@ authz_http_common_fields() ->
maps:to_list(
maps:without(
[
base_url,
pool_type
],
maps:from_list(emqx_connector_http:fields(config))

View File

@ -240,7 +240,6 @@ http_common_fields() ->
maps:to_list(
maps:without(
[
base_url,
pool_type
],
maps:from_list(connector_fields(http))

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_bridge, [
{description, "EMQX bridges"},
{vsn, "0.1.19"},
{vsn, "0.1.20"},
{registered, [emqx_bridge_sup]},
{mod, {emqx_bridge_app, []}},
{applications, [

View File

@ -687,11 +687,15 @@ get_metrics_from_local_node(BridgeType, BridgeName) ->
).
is_enabled_bridge(BridgeType, BridgeName) ->
try emqx:get_config([bridges, BridgeType, BridgeName]) of
try emqx:get_config([bridges, BridgeType, binary_to_existing_atom(BridgeName)]) of
ConfMap ->
maps:get(enable, ConfMap, false)
catch
error:{config_not_found, _} ->
throw(not_found);
error:badarg ->
%% catch non-existing atom,
%% none-existing atom means it is not available in config PT storage.
throw(not_found)
end.

View File

@ -68,7 +68,7 @@ basic_config() ->
)}
] ++ webhook_creation_opts() ++
proplists:delete(
max_retries, proplists:delete(base_url, emqx_connector_http:fields(config))
max_retries, emqx_connector_http:fields(config)
).
request_config() ->

View File

@ -160,6 +160,7 @@ init_node(Type) ->
ok = emqx_common_test_helpers:start_apps(?SUITE_APPS, fun load_suite_config/1),
case Type of
primary ->
ok = emqx_dashboard_desc_cache:init(),
ok = emqx_config:put(
[dashboard, listeners],
#{http => #{enable => true, bind => 18083, proxy_header => false}}

View File

@ -121,7 +121,7 @@ assert_upgraded1(Map) ->
?assert(maps:is_key(<<"ssl">>, Map)).
check(Conf) when is_map(Conf) ->
hocon_tconf:check_plain(emqx_bridge_schema, Conf).
hocon_tconf:check_plain(emqx_bridge_schema, Conf, #{required => false}).
%% erlfmt-ignore
%% this is config generated from v5.0.11

View File

@ -100,17 +100,21 @@
?assertMetrics(Pat, true, BridgeID)
).
-define(assertMetrics(Pat, Guard, BridgeID),
?assertMatch(
#{
<<"metrics">> := Pat,
<<"node_metrics">> := [
#{
<<"node">> := _,
<<"metrics">> := Pat
}
]
} when Guard,
request_bridge_metrics(BridgeID)
?retry(
_Sleep = 300,
_Attempts0 = 20,
?assertMatch(
#{
<<"metrics">> := Pat,
<<"node_metrics">> := [
#{
<<"node">> := _,
<<"metrics">> := Pat
}
]
} when Guard,
request_bridge_metrics(BridgeID)
)
)
).

View File

@ -23,6 +23,7 @@
-compile(export_all).
-import(emqx_mgmt_api_test_util, [request/3, uri/1]).
-import(emqx_common_test_helpers, [on_exit/1]).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
@ -52,6 +53,13 @@ end_per_suite(_Config) ->
suite() ->
[{timetrap, {seconds, 60}}].
init_per_testcase(_TestCase, Config) ->
Config.
end_per_testcase(_TestCase, _Config) ->
emqx_common_test_helpers:call_janitor(),
ok.
%%------------------------------------------------------------------------------
%% HTTP server for testing
%% (Orginally copied from emqx_bridge_api_SUITE)
@ -158,7 +166,8 @@ bridge_async_config(#{port := Port} = Config) ->
QueryMode = maps:get(query_mode, Config, "async"),
ConnectTimeout = maps:get(connect_timeout, Config, 1),
RequestTimeout = maps:get(request_timeout, Config, 10000),
ResourceRequestTimeout = maps:get(resouce_request_timeout, Config, "infinity"),
ResumeInterval = maps:get(resume_interval, Config, "1s"),
ResourceRequestTimeout = maps:get(resource_request_timeout, Config, "infinity"),
ConfigString = io_lib:format(
"bridges.~s.~s {\n"
" url = \"http://localhost:~p\"\n"
@ -177,7 +186,8 @@ bridge_async_config(#{port := Port} = Config) ->
" health_check_interval = \"15s\"\n"
" max_buffer_bytes = \"1GB\"\n"
" query_mode = \"~s\"\n"
" request_timeout = \"~s\"\n"
" request_timeout = \"~p\"\n"
" resume_interval = \"~s\"\n"
" start_after_created = \"true\"\n"
" start_timeout = \"5s\"\n"
" worker_pool_size = \"1\"\n"
@ -194,7 +204,8 @@ bridge_async_config(#{port := Port} = Config) ->
PoolSize,
RequestTimeout,
QueryMode,
ResourceRequestTimeout
ResourceRequestTimeout,
ResumeInterval
]
),
ct:pal(ConfigString),
@ -236,7 +247,7 @@ t_send_async_connection_timeout(_Config) ->
query_mode => "async",
connect_timeout => ResponseDelayMS * 2,
request_timeout => 10000,
resouce_request_timeout => "infinity"
resource_request_timeout => "infinity"
}),
NumberOfMessagesToSend = 10,
[
@ -250,6 +261,97 @@ t_send_async_connection_timeout(_Config) ->
stop_http_server(Server),
ok.
t_async_free_retries(_Config) ->
#{port := Port} = start_http_server(#{response_delay_ms => 0}),
BridgeID = make_bridge(#{
port => Port,
pool_size => 1,
query_mode => "sync",
connect_timeout => 1_000,
request_timeout => 10_000,
resource_request_timeout => "10000s"
}),
%% Fail 5 times then succeed.
Context = #{error_attempts => 5},
ExpectedAttempts = 6,
Fn = fun(Get, Error) ->
?assertMatch(
{ok, 200, _, _},
emqx_bridge:send_message(BridgeID, #{<<"hello">> => <<"world">>}),
#{error => Error}
),
?assertEqual(ExpectedAttempts, Get(), #{error => Error})
end,
do_t_async_retries(Context, {error, normal}, Fn),
do_t_async_retries(Context, {error, {shutdown, normal}}, Fn),
ok.
t_async_common_retries(_Config) ->
#{port := Port} = start_http_server(#{response_delay_ms => 0}),
BridgeID = make_bridge(#{
port => Port,
pool_size => 1,
query_mode => "sync",
resume_interval => "100ms",
connect_timeout => 1_000,
request_timeout => 10_000,
resource_request_timeout => "10000s"
}),
%% Keeps failing until connector gives up.
Context = #{error_attempts => infinity},
ExpectedAttempts = 3,
FnSucceed = fun(Get, Error) ->
?assertMatch(
{ok, 200, _, _},
emqx_bridge:send_message(BridgeID, #{<<"hello">> => <<"world">>}),
#{error => Error, attempts => Get()}
),
?assertEqual(ExpectedAttempts, Get(), #{error => Error})
end,
FnFail = fun(Get, Error) ->
?assertMatch(
Error,
emqx_bridge:send_message(BridgeID, #{<<"hello">> => <<"world">>}),
#{error => Error, attempts => Get()}
),
?assertEqual(ExpectedAttempts, Get(), #{error => Error})
end,
%% These two succeed because they're further retried by the buffer
%% worker synchronously, and we're not mock that call.
do_t_async_retries(Context, {error, {closed, "The connection was lost."}}, FnSucceed),
do_t_async_retries(Context, {error, {shutdown, closed}}, FnSucceed),
%% This fails because this error is treated as unrecoverable.
do_t_async_retries(Context, {error, something_else}, FnFail),
ok.
do_t_async_retries(TestContext, Error, Fn) ->
#{error_attempts := ErrorAttempts} = TestContext,
persistent_term:put({?MODULE, ?FUNCTION_NAME, attempts}, 0),
on_exit(fun() -> persistent_term:erase({?MODULE, ?FUNCTION_NAME, attempts}) end),
Get = fun() -> persistent_term:get({?MODULE, ?FUNCTION_NAME, attempts}) end,
GetAndBump = fun() ->
Attempts = persistent_term:get({?MODULE, ?FUNCTION_NAME, attempts}),
persistent_term:put({?MODULE, ?FUNCTION_NAME, attempts}, Attempts + 1),
Attempts + 1
end,
emqx_common_test_helpers:with_mock(
emqx_connector_http,
reply_delegator,
fun(Context, ReplyFunAndArgs, Result) ->
Attempts = GetAndBump(),
case Attempts > ErrorAttempts of
true ->
ct:pal("succeeding ~p : ~p", [Error, Attempts]),
meck:passthrough([Context, ReplyFunAndArgs, Result]);
false ->
ct:pal("failing ~p : ~p", [Error, Attempts]),
meck:passthrough([Context, ReplyFunAndArgs, Error])
end
end,
fun() -> Fn(Get, Error) end
),
ok.
receive_request_notifications(MessageIDs, _ResponseDelay) when map_size(MessageIDs) =:= 0 ->
ok;
receive_request_notifications(MessageIDs, ResponseDelay) ->

View File

@ -11,7 +11,6 @@ The application is used to connect EMQX and Cassandra. User can create a rule
and easily ingest IoT data into Cassandra by leveraging
[EMQX Rules](https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html).
<!---
# Documentation
@ -20,7 +19,6 @@ and easily ingest IoT data into Cassandra by leveraging
- Refer to [EMQX Rules](https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html)
for the EMQX rules engine introduction.
--->
# HTTP APIs

View File

@ -506,7 +506,17 @@ t_write_failure(Config) ->
ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config),
QueryMode = ?config(query_mode, Config),
{ok, _} = create_bridge(Config),
{ok, _} = create_bridge(
Config,
#{
<<"resource_opts">> =>
#{
<<"auto_restart_interval">> => <<"100ms">>,
<<"resume_interval">> => <<"100ms">>,
<<"health_check_interval">> => <<"100ms">>
}
}
),
Val = integer_to_binary(erlang:unique_integer()),
SentData = #{
topic => atom_to_binary(?FUNCTION_NAME),
@ -523,7 +533,9 @@ t_write_failure(Config) ->
async ->
send_message(Config, SentData)
end,
#{?snk_kind := buffer_worker_flush_nack},
#{?snk_kind := Evt} when
Evt =:= buffer_worker_flush_nack orelse
Evt =:= buffer_worker_retry_inflight_failed,
10_000
)
end),

View File

@ -23,7 +23,7 @@ User can create a rule and easily ingest IoT data into ClickHouse by leveraging
- Several APIs are provided for bridge management, which includes create bridge,
update bridge, get bridge, stop or restart bridge and list bridges etc.
Refer to [API Docs - Bridges](https://docs.emqx.com/en/enterprise/v5.0/admin/api-docs.html#tag/Bridges)
- Refer to [API Docs - Bridges](https://docs.emqx.com/en/enterprise/v5.0/admin/api-docs.html#tag/Bridges)
for more detailed information.

View File

@ -0,0 +1 @@
clickhouse

View File

@ -0,0 +1,11 @@
%% -*- mode: erlang; -*-
{erl_opts, [debug_info]}.
{deps, [ {clickhouse, {git, "https://github.com/emqx/clickhouse-client-erl", {tag, "0.3"}}}
, {emqx_connector, {path, "../../apps/emqx_connector"}}
, {emqx_resource, {path, "../../apps/emqx_resource"}}
, {emqx_bridge, {path, "../../apps/emqx_bridge"}}
]}.
{shell, [
{apps, [emqx_bridge_clickhouse]}
]}.

View File

@ -1,8 +1,8 @@
{application, emqx_bridge_clickhouse, [
{description, "EMQX Enterprise ClickHouse Bridge"},
{vsn, "0.1.0"},
{vsn, "0.2.0"},
{registered, []},
{applications, [kernel, stdlib]},
{applications, [kernel, stdlib, clickhouse, emqx_resource]},
{env, []},
{modules, []},
{links, []}

View File

@ -1,9 +1,8 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_bridge_clickhouse).
-module(emqx_bridge_clickhouse).
-include_lib("emqx_bridge/include/emqx_bridge.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
@ -101,7 +100,7 @@ fields("config") ->
}
)}
] ++
emqx_ee_connector_clickhouse:fields(config);
emqx_bridge_clickhouse_connector:fields(config);
fields("creation_opts") ->
emqx_resource_schema:fields("creation_opts");
fields("post") ->

View File

@ -2,7 +2,7 @@
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_connector_clickhouse).
-module(emqx_bridge_clickhouse_connector).
-include_lib("emqx_connector/include/emqx_connector.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").

View File

@ -2,17 +2,17 @@
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_bridge_clickhouse_SUITE).
-module(emqx_bridge_clickhouse_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-define(APP, emqx_bridge_clickhouse).
-define(CLICKHOUSE_HOST, "clickhouse").
-define(CLICKHOUSE_RESOURCE_MOD, emqx_ee_connector_clickhouse).
-include_lib("emqx_connector/include/emqx_connector.hrl").
%% See comment in
%% lib-ee/emqx_ee_connector/test/ee_connector_clickhouse_SUITE.erl for how to
%% lib-ee/emqx_ee_connector/test/ee_bridge_clickhouse_connector_SUITE.erl for how to
%% run this without bringing up the whole CI infrastucture
%%------------------------------------------------------------------------------
@ -26,10 +26,7 @@ init_per_suite(Config) ->
true ->
emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
{ok, _} = application:ensure_all_started(emqx_connector),
{ok, _} = application:ensure_all_started(emqx_ee_connector),
{ok, _} = application:ensure_all_started(emqx_ee_bridge),
ok = emqx_connector_test_helpers:start_apps([emqx_resource, ?APP]),
snabbkaffe:fix_ct_logging(),
%% Create the db table
Conn = start_clickhouse_connection(),
@ -76,11 +73,8 @@ start_clickhouse_connection() ->
end_per_suite(Config) ->
ClickhouseConnection = proplists:get_value(clickhouse_connection, Config),
clickhouse:stop(ClickhouseConnection),
ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
_ = application:stop(emqx_connector),
_ = application:stop(emqx_ee_connector),
_ = application:stop(emqx_bridge).
ok = emqx_connector_test_helpers:stop_apps([?APP, emqx_resource]),
ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]).
init_per_testcase(_, Config) ->
reset_table(Config),

View File

@ -2,18 +2,18 @@
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_connector_clickhouse_SUITE).
-module(emqx_bridge_clickhouse_connector_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include("emqx_connector.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("stdlib/include/assert.hrl").
-define(APP, emqx_bridge_clickhouse).
-define(CLICKHOUSE_HOST, "clickhouse").
-define(CLICKHOUSE_RESOURCE_MOD, emqx_ee_connector_clickhouse).
-define(CLICKHOUSE_RESOURCE_MOD, emqx_bridge_clickhouse_connector).
%% This test SUITE requires a running clickhouse instance. If you don't want to
%% bring up the whole CI infrastuctucture with the `scripts/ct/run.sh` script
@ -21,7 +21,15 @@
%% from root of the EMQX directory.). You also need to set ?CLICKHOUSE_HOST and
%% ?CLICKHOUSE_PORT to appropriate values.
%%
%% docker run -d -p 18123:8123 -p19000:9000 --name some-clickhouse-server --ulimit nofile=262144:262144 -v "`pwd`/.ci/docker-compose-file/clickhouse/users.xml:/etc/clickhouse-server/users.xml" -v "`pwd`/.ci/docker-compose-file/clickhouse/config.xml:/etc/clickhouse-server/config.xml" clickhouse/clickhouse-server
%% docker run \
%% -d \
%% -p 18123:8123 \
%% -p 19000:9000 \
%% --name some-clickhouse-server \
%% --ulimit nofile=262144:262144 \
%% -v "`pwd`/.ci/docker-compose-file/clickhouse/users.xml:/etc/clickhouse-server/users.xml" \
%% -v "`pwd`/.ci/docker-compose-file/clickhouse/config.xml:/etc/clickhouse-server/config.xml" \
%% clickhouse/clickhouse-server
all() ->
emqx_common_test_helpers:all(?MODULE).
@ -43,9 +51,7 @@ init_per_suite(Config) ->
of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_conf]),
ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
{ok, _} = application:ensure_all_started(emqx_connector),
{ok, _} = application:ensure_all_started(emqx_ee_connector),
ok = emqx_connector_test_helpers:start_apps([emqx_resource, ?APP]),
%% Create the db table
{ok, Conn} =
clickhouse:start_link([
@ -68,8 +74,7 @@ init_per_suite(Config) ->
end_per_suite(_Config) ->
ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
_ = application:stop(emqx_connector).
ok = emqx_connector_test_helpers:stop_apps([?APP, emqx_resource]).
init_per_testcase(_, Config) ->
Config.
@ -119,7 +124,6 @@ perform_lifecycle_check(ResourceID, InitialConfig) ->
?assertEqual({ok, connected}, emqx_resource:health_check(ResourceID)),
% % Perform query as further check that the resource is working as expected
(fun() ->
erlang:display({pool_name, ResourceID}),
QueryNoParamsResWrapper = emqx_resource:query(ResourceID, test_query_no_params()),
?assertMatch({ok, _}, QueryNoParamsResWrapper),
{_, QueryNoParamsRes} = QueryNoParamsResWrapper,

View File

@ -1,6 +1,6 @@
# EMQX DynamoDB Bridge
[Dynamodb](https://aws.amazon.com/dynamodb/) is a high-performance NoSQL database
[DynamoDB](https://aws.amazon.com/dynamodb/) is a high-performance NoSQL database
service provided by Amazon that's designed for scalability and low-latency access
to structured data.

View File

@ -10,7 +10,7 @@ User can create a rule and easily ingest IoT data into GCP Pub/Sub by leveraging
# Documentation
- Refer to [Ingest data into GCP Pub/Sub](https://docs.emqx.com/en/enterprise/v5.0/data-integration/data-bridge-gcp-pubsub.html)
- Refer to [Ingest Data into GCP Pub/Sub](https://docs.emqx.com/en/enterprise/v5.0/data-integration/data-bridge-gcp-pubsub.html)
for how to use EMQX dashboard to ingest IoT data into GCP Pub/Sub.
- Refer to [EMQX Rules](https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html)

View File

@ -288,6 +288,7 @@ gcp_pubsub_config(Config) ->
" pipelining = ~b\n"
" resource_opts = {\n"
" request_timeout = 500ms\n"
" metrics_flush_interval = 700ms\n"
" worker_pool_size = 1\n"
" query_mode = ~s\n"
" batch_size = ~b\n"
@ -529,12 +530,14 @@ wait_until_gauge_is(GaugeName, ExpectedValue, Timeout) ->
end.
receive_all_events(EventName, Timeout) ->
receive_all_events(EventName, Timeout, []).
receive_all_events(EventName, Timeout, _MaxEvents = 10, _Count = 0, _Acc = []).
receive_all_events(EventName, Timeout, Acc) ->
receive_all_events(_EventName, _Timeout, MaxEvents, Count, Acc) when Count >= MaxEvents ->
lists:reverse(Acc);
receive_all_events(EventName, Timeout, MaxEvents, Count, Acc) ->
receive
{telemetry, #{name := [_, _, EventName]} = Event} ->
receive_all_events(EventName, Timeout, [Event | Acc])
receive_all_events(EventName, Timeout, MaxEvents, Count + 1, [Event | Acc])
after Timeout ->
lists:reverse(Acc)
end.
@ -557,8 +560,9 @@ wait_n_events(_TelemetryTable, _ResourceId, NEvents, _Timeout, _EventName) when
ok;
wait_n_events(TelemetryTable, ResourceId, NEvents, Timeout, EventName) ->
receive
{telemetry, #{name := [_, _, EventName]}} ->
wait_n_events(TelemetryTable, ResourceId, NEvents - 1, Timeout, EventName)
{telemetry, #{name := [_, _, EventName], measurements := #{counter_inc := Inc}} = Event} ->
ct:pal("telemetry event: ~p", [Event]),
wait_n_events(TelemetryTable, ResourceId, NEvents - Inc, Timeout, EventName)
after Timeout ->
RecordedEvents = ets:tab2list(TelemetryTable),
CurrentMetrics = current_metrics(ResourceId),
@ -575,7 +579,6 @@ t_publish_success(Config) ->
ResourceId = ?config(resource_id, Config),
ServiceAccountJSON = ?config(service_account_json, Config),
TelemetryTable = ?config(telemetry_table, Config),
QueryMode = ?config(query_mode, Config),
Topic = <<"t/topic">>,
?check_trace(
create_bridge(Config),
@ -604,17 +607,6 @@ t_publish_success(Config) ->
),
%% to avoid test flakiness
wait_telemetry_event(TelemetryTable, success, ResourceId),
ExpectedInflightEvents =
case QueryMode of
sync -> 1;
async -> 3
end,
wait_telemetry_event(
TelemetryTable,
inflight,
ResourceId,
#{n_events => ExpectedInflightEvents, timeout => 5_000}
),
wait_until_gauge_is(queuing, 0, 500),
wait_until_gauge_is(inflight, 0, 500),
assert_metrics(
@ -659,7 +651,6 @@ t_publish_success_local_topic(Config) ->
ResourceId = ?config(resource_id, Config),
ServiceAccountJSON = ?config(service_account_json, Config),
TelemetryTable = ?config(telemetry_table, Config),
QueryMode = ?config(query_mode, Config),
LocalTopic = <<"local/topic">>,
{ok, _} = create_bridge(Config, #{<<"local_topic">> => LocalTopic}),
assert_empty_metrics(ResourceId),
@ -678,17 +669,6 @@ t_publish_success_local_topic(Config) ->
),
%% to avoid test flakiness
wait_telemetry_event(TelemetryTable, success, ResourceId),
ExpectedInflightEvents =
case QueryMode of
sync -> 1;
async -> 3
end,
wait_telemetry_event(
TelemetryTable,
inflight,
ResourceId,
#{n_events => ExpectedInflightEvents, timeout => 5_000}
),
wait_until_gauge_is(queuing, 0, 500),
wait_until_gauge_is(inflight, 0, 500),
assert_metrics(
@ -720,7 +700,6 @@ t_publish_templated(Config) ->
ResourceId = ?config(resource_id, Config),
ServiceAccountJSON = ?config(service_account_json, Config),
TelemetryTable = ?config(telemetry_table, Config),
QueryMode = ?config(query_mode, Config),
Topic = <<"t/topic">>,
PayloadTemplate = <<
"{\"payload\": \"${payload}\","
@ -766,17 +745,6 @@ t_publish_templated(Config) ->
),
%% to avoid test flakiness
wait_telemetry_event(TelemetryTable, success, ResourceId),
ExpectedInflightEvents =
case QueryMode of
sync -> 1;
async -> 3
end,
wait_telemetry_event(
TelemetryTable,
inflight,
ResourceId,
#{n_events => ExpectedInflightEvents, timeout => 5_000}
),
wait_until_gauge_is(queuing, 0, 500),
wait_until_gauge_is(inflight, 0, 500),
assert_metrics(
@ -1113,9 +1081,6 @@ do_econnrefused_or_timeout_test(Config, Error) ->
%% message as dropped; and since it never considers the
%% response expired, this succeeds.
econnrefused ->
wait_telemetry_event(TelemetryTable, queuing, ResourceId, #{
timeout => 10_000, n_events => 1
}),
%% even waiting, hard to avoid flakiness... simpler to just sleep
%% a bit until stabilization.
ct:sleep(200),
@ -1135,8 +1100,8 @@ do_econnrefused_or_timeout_test(Config, Error) ->
CurrentMetrics
);
timeout ->
wait_until_gauge_is(inflight, 0, _Timeout = 400),
wait_until_gauge_is(queuing, 0, _Timeout = 400),
wait_until_gauge_is(inflight, 0, _Timeout = 1_000),
wait_until_gauge_is(queuing, 0, _Timeout = 1_000),
assert_metrics(
#{
dropped => 0,

View File

@ -15,7 +15,7 @@ easily ingest IoT data into InfluxDB by leveraging
# Documentation
- Refer to [Ingest data into InfluxDB](https://docs.emqx.com/en/enterprise/v5.0/data-integration/data-bridge-influxdb.html)
- Refer to [Ingest Data into InfluxDB](https://docs.emqx.com/en/enterprise/v5.0/data-integration/data-bridge-influxdb.html)
for how to use EMQX dashboard to ingest IoT data into InfluxDB.
- Refer to [EMQX Rules](https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html)

View File

@ -12,13 +12,15 @@ It implements the connection management and interaction without need for a
For more information on Apache IoTDB, please see its [official
site](https://iotdb.apache.org/).
<!---
# Configurations
Please see [our official
documentation](https://www.emqx.io/docs/en/v5.0/data-integration/data-bridge-iotdb.html)
for more detailed info.
--->
# Contributing - [Mandatory]
# Contributing
Please see our [contributing.md](../../CONTRIBUTING.md).
# License

View File

@ -16,7 +16,7 @@ For more information about Apache Kafka, please see its [official site](https://
# Configurations
Please see [Ingest data into Kafka](https://www.emqx.io/docs/en/v5.0/data-integration/data-bridge-kafka.html) for more detailed info.
Please see [Ingest Data into Kafka](https://www.emqx.io/docs/en/v5.0/data-integration/data-bridge-kafka.html) for more detailed info.
# Contributing

View File

@ -1074,7 +1074,7 @@ cluster(Config) ->
{priv_data_dir, PrivDataDir},
{load_schema, true},
{start_autocluster, true},
{schema_mod, emqx_ee_conf_schema},
{schema_mod, emqx_enterprise_schema},
{env_handler, fun
(emqx) ->
application:set_env(emqx, boot_modules, [broker, router]),

View File

@ -13,7 +13,7 @@ User can create a rule and easily ingest IoT data into MongoDB by leveraging
# Documentation
- Refer to [Ingest data into MongoDB](https://docs.emqx.com/en/enterprise/v5.0/data-integration/data-bridge-mongodb.html)
- Refer to [Ingest Data into MongoDB](https://docs.emqx.com/en/enterprise/v5.0/data-integration/data-bridge-mongodb.html)
for how to use EMQX dashboard to ingest IoT data into MongoDB.
- Refer to [EMQX Rules](https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html)

View File

@ -10,7 +10,7 @@ User can create a rule and easily ingest IoT data into MySQL by leveraging
# Documentation
- Refer to [Ingest data into MySQL](https://docs.emqx.com/en/enterprise/v5.0/data-integration/data-bridge-mysql.html)
- Refer to [Ingest Data into MySQL](https://docs.emqx.com/en/enterprise/v5.0/data-integration/data-bridge-mysql.html)
for how to use EMQX dashboard to ingest IoT data into MySQL.
- Refer to [EMQX Rules](https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html)

View File

@ -1,6 +1,6 @@
# EMQX PostgreSQL Bridge
[PostgreSQL](https://github.com/PostgreSQL/PostgreSQL) is an open-source relational
[PostgreSQL](https://www.postgresql.org/) is an open-source relational
database management system (RDBMS) that uses and extends the SQL language.
It is known for its reliability, data integrity, and advanced features such as
support for JSON, XML, and other data formats.
@ -12,7 +12,7 @@ User can create a rule and easily ingest IoT data into PostgreSQL by leveraging
# Documentation
- Refer to [Ingest data into PostgreSQL](https://docs.emqx.com/en/enterprise/v5.0/data-integration/data-bridge-pgsql.html)
- Refer to [Ingest Data into PostgreSQL](https://docs.emqx.com/en/enterprise/v5.0/data-integration/data-bridge-pgsql.html)
for how to use EMQX dashboard to ingest IoT data into PostgreSQL.
- Refer to [EMQX Rules](https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html)

View File

@ -258,13 +258,18 @@ query_resource(Config, Request) ->
emqx_resource:query(ResourceID, Request, #{timeout => 1_000}).
query_resource_async(Config, Request) ->
query_resource_async(Config, Request, _Opts = #{}).
query_resource_async(Config, Request, Opts) ->
Name = ?config(pgsql_name, Config),
BridgeType = ?config(pgsql_bridge_type, Config),
Ref = alias([reply]),
AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end,
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
Timeout = maps:get(timeout, Opts, 500),
Return = emqx_resource:query(ResourceID, Request, #{
timeout => 500, async_reply_fun => {AsyncReplyFun, []}
timeout => Timeout,
async_reply_fun => {AsyncReplyFun, []}
}),
{Return, Ref}.
@ -498,9 +503,9 @@ t_write_timeout(Config) ->
Config,
#{
<<"resource_opts">> => #{
<<"request_timeout">> => 500,
<<"resume_interval">> => 100,
<<"health_check_interval">> => 100
<<"auto_restart_interval">> => <<"100ms">>,
<<"resume_interval">> => <<"100ms">>,
<<"health_check_interval">> => <<"100ms">>
}
}
),
@ -515,7 +520,7 @@ t_write_timeout(Config) ->
Res1 =
case QueryMode of
async ->
query_resource_async(Config, {send_message, SentData});
query_resource_async(Config, {send_message, SentData}, #{timeout => 60_000});
sync ->
query_resource(Config, {send_message, SentData})
end,
@ -526,7 +531,17 @@ t_write_timeout(Config) ->
{_, Ref} when is_reference(Ref) ->
case receive_result(Ref, 15_000) of
{ok, Res} ->
?assertMatch({error, {unrecoverable_error, _}}, Res);
%% we may receive a successful result depending on
%% timing, if the request is retried after the
%% failure is healed.
case Res of
{error, {unrecoverable_error, _}} ->
ok;
{ok, _} ->
ok;
_ ->
ct:fail("unexpected result: ~p", [Res])
end;
timeout ->
ct:pal("mailbox:\n ~p", [process_info(self(), messages)]),
ct:fail("no response received")

View File

@ -15,11 +15,13 @@ used by authentication and authorization applications.
For more information on Apache Pulsar, please see its [official
site](https://pulsar.apache.org/).
<!---
# Configurations
Please see [our official
documentation](https://www.emqx.io/docs/en/v5.0/data-integration/data-bridge-pulsar.html)
for more detailed info.
--->
# Contributing

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_pulsar, [
{description, "EMQX Pulsar Bridge"},
{vsn, "0.1.2"},
{vsn, "0.1.3"},
{registered, []},
{applications, [
kernel,

View File

@ -81,6 +81,7 @@ on_start(InstanceId, Config) ->
} = Config,
Servers = format_servers(Servers0),
ClientId = make_client_id(InstanceId, BridgeName),
ok = emqx_resource:allocate_resource(InstanceId, pulsar_client_id, ClientId),
SSLOpts = emqx_tls_lib:to_client_opts(SSL),
ConnectTimeout = maps:get(connect_timeout, Config, timer:seconds(5)),
ClientOpts = #{
@ -116,15 +117,29 @@ on_start(InstanceId, Config) ->
start_producer(Config, InstanceId, ClientId, ClientOpts).
-spec on_stop(resource_id(), state()) -> ok.
on_stop(_InstanceId, State) ->
#{
pulsar_client_id := ClientId,
producers := Producers
} = State,
stop_producers(ClientId, Producers),
stop_client(ClientId),
?tp(pulsar_bridge_stopped, #{instance_id => _InstanceId}),
ok.
on_stop(InstanceId, _State) ->
case emqx_resource:get_allocated_resources(InstanceId) of
#{pulsar_client_id := ClientId, pulsar_producers := Producers} ->
stop_producers(ClientId, Producers),
stop_client(ClientId),
?tp(pulsar_bridge_stopped, #{
instance_id => InstanceId,
pulsar_client_id => ClientId,
pulsar_producers => Producers
}),
ok;
#{pulsar_client_id := ClientId} ->
stop_client(ClientId),
?tp(pulsar_bridge_stopped, #{
instance_id => InstanceId,
pulsar_client_id => ClientId,
pulsar_producers => undefined
}),
ok;
_ ->
?tp(pulsar_bridge_stopped, #{instance_id => InstanceId}),
ok
end.
-spec on_get_status(resource_id(), state()) -> connected | disconnected.
on_get_status(_InstanceId, State = #{}) ->
@ -325,6 +340,8 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) ->
?tp(pulsar_producer_about_to_start_producers, #{producer_name => ProducerName}),
try pulsar:ensure_supervised_producers(ClientId, PulsarTopic, ProducerOpts) of
{ok, Producers} ->
ok = emqx_resource:allocate_resource(InstanceId, pulsar_producers, Producers),
?tp(pulsar_producer_producers_allocated, #{}),
State = #{
pulsar_client_id => ClientId,
producers => Producers,

View File

@ -43,7 +43,9 @@ only_once_tests() ->
t_send_when_down,
t_send_when_timeout,
t_failure_to_start_producer,
t_producer_process_crash
t_producer_process_crash,
t_resource_manager_crash_after_producers_started,
t_resource_manager_crash_before_producers_started
].
init_per_suite(Config) ->
@ -429,7 +431,19 @@ wait_until_producer_connected() ->
wait_until_connected(pulsar_producers_sup, pulsar_producer).
wait_until_connected(SupMod, Mod) ->
Pids = [
Pids = get_pids(SupMod, Mod),
?retry(
_Sleep = 300,
_Attempts0 = 20,
lists:foreach(fun(P) -> {connected, _} = sys:get_state(P) end, Pids)
),
ok.
get_pulsar_producers() ->
get_pids(pulsar_producers_sup, pulsar_producer).
get_pids(SupMod, Mod) ->
[
P
|| {_Name, SupPid, _Type, _Mods} <- supervisor:which_children(SupMod),
P <- element(2, process_info(SupPid, links)),
@ -437,13 +451,7 @@ wait_until_connected(SupMod, Mod) ->
{Mod, init, _} -> true;
_ -> false
end
],
?retry(
_Sleep = 300,
_Attempts0 = 20,
lists:foreach(fun(P) -> {connected, _} = sys:get_state(P) end, Pids)
),
ok.
].
create_rule_and_action_http(Config) ->
PulsarName = ?config(pulsar_name, Config),
@ -496,7 +504,7 @@ cluster(Config) ->
{priv_data_dir, PrivDataDir},
{load_schema, true},
{start_autocluster, true},
{schema_mod, emqx_ee_conf_schema},
{schema_mod, emqx_enterprise_schema},
{env_handler, fun
(emqx) ->
application:set_env(emqx, boot_modules, [broker, router]),
@ -528,6 +536,18 @@ start_cluster(Cluster) ->
end),
Nodes.
kill_resource_managers() ->
ct:pal("gonna kill resource managers"),
lists:foreach(
fun({_, Pid, _, _}) ->
ct:pal("terminating resource manager ~p", [Pid]),
%% sys:terminate(Pid, stop),
exit(Pid, kill),
ok
end,
supervisor:which_children(emqx_resource_manager_sup)
).
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
@ -921,7 +941,11 @@ t_producer_process_crash(Config) ->
ok
after 1_000 -> ct:fail("pid didn't die")
end,
?assertEqual({ok, connecting}, emqx_resource_manager:health_check(ResourceId)),
?retry(
_Sleep0 = 50,
_Attempts0 = 50,
?assertEqual({ok, connecting}, emqx_resource_manager:health_check(ResourceId))
),
%% Should recover given enough time.
?retry(
_Sleep = 1_000,
@ -952,6 +976,69 @@ t_producer_process_crash(Config) ->
),
ok.
t_resource_manager_crash_after_producers_started(Config) ->
?check_trace(
begin
?force_ordering(
#{?snk_kind := pulsar_producer_producers_allocated},
#{?snk_kind := will_kill_resource_manager}
),
?force_ordering(
#{?snk_kind := resource_manager_killed},
#{?snk_kind := pulsar_producer_bridge_started}
),
spawn_link(fun() ->
?tp(will_kill_resource_manager, #{}),
kill_resource_managers(),
?tp(resource_manager_killed, #{}),
ok
end),
%% even if the resource manager is dead, we can still
%% clear the allocated resources.
{{error, {config_update_crashed, {killed, _}}}, {ok, _}} =
?wait_async_action(
create_bridge(Config),
#{?snk_kind := pulsar_bridge_stopped, pulsar_producers := Producers} when
Producers =/= undefined,
10_000
),
ok
end,
[]
),
ok.
t_resource_manager_crash_before_producers_started(Config) ->
?check_trace(
begin
?force_ordering(
#{?snk_kind := pulsar_producer_capture_name},
#{?snk_kind := will_kill_resource_manager}
),
?force_ordering(
#{?snk_kind := resource_manager_killed},
#{?snk_kind := pulsar_producer_about_to_start_producers}
),
spawn_link(fun() ->
?tp(will_kill_resource_manager, #{}),
kill_resource_managers(),
?tp(resource_manager_killed, #{}),
ok
end),
%% even if the resource manager is dead, we can still
%% clear the allocated resources.
{{error, {config_update_crashed, {killed, _}}}, {ok, _}} =
?wait_async_action(
create_bridge(Config),
#{?snk_kind := pulsar_bridge_stopped, pulsar_producers := undefined},
10_000
),
ok
end,
[]
),
ok.
t_cluster(Config) ->
MQTTTopic = ?config(mqtt_topic, Config),
ResourceId = resource_id(Config),

View File

@ -21,8 +21,10 @@ and easily ingest IoT data into RabbitMQ by leveraging
# Documentation
<!---
- Refer to the [RabbitMQ bridge documentation](https://docs.emqx.com/en/enterprise/v5.0/data-integration/data-bridge-rabbitmq.html)
for how to use EMQX dashboard to ingest IoT data into RabbitMQ.
--->
- Refer to [EMQX Rules](https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html)
for an introduction to the EMQX rules engine.

View File

@ -11,7 +11,7 @@ User can create a rule and easily ingest IoT data into Redis by leveraging
# Documentation
- Refer to [Ingest data into Redis](https://docs.emqx.com/en/enterprise/v5.0/data-integration/data-bridge-redis.html)
- Refer to [Ingest Data into Redis](https://docs.emqx.com/en/enterprise/v5.0/data-integration/data-bridge-redis.html)
for how to use EMQX dashboard to ingest IoT data into Redis.
- Refer to [EMQX Rules](https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html)

View File

@ -11,7 +11,7 @@ User can create a rule and easily ingest IoT data into RocketMQ by leveraging
# Documentation
- Refer to [Ingest data into RocketMQ](https://docs.emqx.com/en/enterprise/v5.0/data-integration/data-bridge-rocketmq.html)
- Refer to [Ingest Data into RocketMQ](https://docs.emqx.com/en/enterprise/v5.0/data-integration/data-bridge-rocketmq.html)
for how to use EMQX dashboard to ingest IoT data into RocketMQ.
- Refer to [EMQX Rules](https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html)

View File

@ -16,7 +16,7 @@ For more information about Microsoft SQL Server, please see the [official site](
# Configurations
Please see [Ingest data into SQL Server](https://www.emqx.io/docs/en/v5.0/data-integration/data-bridge-sqlserver.html) for more detailed information.
Please see [Ingest Data into SQL Server](https://www.emqx.io/docs/en/v5.0/data-integration/data-bridge-sqlserver.html) for more detailed information.
# HTTP APIs

View File

@ -13,7 +13,7 @@ User can create a rule and easily ingest IoT data into TDEngine by leveraging
# Documentation
- Refer to [Ingest data into TDEngine](https://docs.emqx.com/en/enterprise/v5.0/data-integration/data-bridge-tdengine.html)
- Refer to [Ingest Data into TDEngine](https://docs.emqx.com/en/enterprise/v5.0/data-integration/data-bridge-tdengine.html)
for how to use EMQX dashboard to ingest IoT data into TDEngine.
- Refer to [EMQX Rules](https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html)

View File

@ -38,8 +38,8 @@
]).
start_link() ->
MaxHistory = emqx_conf:get(["node", "cluster_call", "max_history"], 100),
CleanupMs = emqx_conf:get(["node", "cluster_call", "cleanup_interval"], 5 * 60 * 1000),
MaxHistory = emqx_conf:get([node, cluster_call, max_history], 100),
CleanupMs = emqx_conf:get([node, cluster_call, cleanup_interval], 5 * 60 * 1000),
start_link(MaxHistory, CleanupMs).
start_link(MaxHistory, CleanupMs) ->

View File

@ -1,6 +1,6 @@
{application, emqx_conf, [
{description, "EMQX configuration management"},
{vsn, "0.1.20"},
{vsn, "0.1.21"},
{registered, []},
{mod, {emqx_conf_app, []}},
{applications, [kernel, stdlib, emqx_ctl]},

View File

@ -49,10 +49,10 @@
-define(MERGED_CONFIGS, [
emqx_bridge_schema,
emqx_retainer_schema,
emqx_statsd_schema,
emqx_authn_schema,
emqx_authz_schema,
emqx_auto_subscribe_schema,
{emqx_telemetry_schema, ce},
emqx_modules_schema,
emqx_plugins_schema,
emqx_dashboard_schema,
@ -109,11 +109,25 @@ roots() ->
] ++
emqx_schema:roots(medium) ++
emqx_schema:roots(low) ++
lists:flatmap(fun roots/1, ?MERGED_CONFIGS).
lists:flatmap(fun roots/1, common_apps()).
validations() ->
hocon_schema:validations(emqx_schema) ++
lists:flatmap(fun hocon_schema:validations/1, ?MERGED_CONFIGS).
lists:flatmap(fun hocon_schema:validations/1, common_apps()).
common_apps() ->
Edition = emqx_release:edition(),
lists:filtermap(
fun
({N, E}) ->
case E =:= Edition of
true -> {true, N};
false -> false
end;
(N) when is_atom(N) -> {true, N}
end,
?MERGED_CONFIGS
).
fields("cluster") ->
[
@ -561,7 +575,7 @@ fields("node") ->
emqx_schema:comma_separated_atoms(),
#{
mapping => "emqx_machine.applications",
default => [],
default => <<"">>,
'readOnly' => true,
importance => ?IMPORTANCE_HIDDEN,
desc => ?DESC(node_applications)
@ -688,11 +702,12 @@ fields("rpc") ->
desc => ?DESC(rpc_mode)
}
)},
{"driver",
{"protocol",
sc(
hoconsc:enum([tcp, ssl]),
#{
mapping => "gen_rpc.driver",
aliases => [driver],
default => tcp,
desc => ?DESC(rpc_driver)
}
@ -870,19 +885,22 @@ fields("rpc") ->
];
fields("log") ->
[
{"console_handler",
{"console",
sc(?R_REF("console_handler"), #{
aliases => [console_handler],
importance => ?IMPORTANCE_HIGH
})},
{"file",
sc(
?R_REF("console_handler"),
#{importance => ?IMPORTANCE_HIGH}
)},
{"file_handlers",
sc(
map(name, ?R_REF("log_file_handler")),
?UNION([
?R_REF("log_file_handler"),
?MAP(handler_name, ?R_REF("log_file_handler"))
]),
#{
desc => ?DESC("log_file_handlers"),
%% because file_handlers is a map
%% so there has to be a default value in order to populate the raw configs
default => #{<<"default">> => #{<<"level">> => <<"warning">>}},
converter => fun ensure_file_handlers/2,
default => #{<<"level">> => <<"warning">>},
aliases => [file_handlers],
importance => ?IMPORTANCE_HIGH
}
)}
@ -891,51 +909,41 @@ fields("console_handler") ->
log_handler_common_confs(console);
fields("log_file_handler") ->
[
{"file",
{"to",
sc(
file(),
#{
desc => ?DESC("log_file_handler_file"),
default => <<"${EMQX_LOG_DIR}/emqx.log">>,
aliases => [file],
importance => ?IMPORTANCE_HIGH,
converter => fun(Path, Opts) ->
emqx_schema:naive_env_interpolation(ensure_unicode_path(Path, Opts))
end,
default => <<"${EMQX_LOG_DIR}/emqx.log">>
end
}
)},
{"rotation",
{"rotation_count",
sc(
?R_REF("log_rotation"),
#{}
range(1, 128),
#{
aliases => [rotation],
default => 10,
converter => fun convert_rotation/2,
desc => ?DESC("log_rotation_count"),
importance => ?IMPORTANCE_MEDIUM
}
)},
{"max_size",
{"rotation_size",
sc(
hoconsc:union([infinity, emqx_schema:bytesize()]),
#{
default => <<"50MB">>,
desc => ?DESC("log_file_handler_max_size"),
aliases => [max_size],
importance => ?IMPORTANCE_MEDIUM
}
)}
] ++ log_handler_common_confs(file);
fields("log_rotation") ->
[
{"enable",
sc(
boolean(),
#{
default => true,
desc => ?DESC("log_rotation_enable")
}
)},
{"count",
sc(
range(1, 2048),
#{
default => 10,
desc => ?DESC("log_rotation_count")
}
)}
];
fields("log_overload_kill") ->
[
{"enable",
@ -1043,8 +1051,8 @@ translation("ekka") ->
[{"cluster_discovery", fun tr_cluster_discovery/1}];
translation("kernel") ->
[
{"logger_level", fun tr_logger_level/1},
{"logger", fun tr_logger_handlers/1},
{"logger_level", fun emqx_config_logger:tr_level/1},
{"logger", fun emqx_config_logger:tr_handlers/1},
{"error_logger", fun(_) -> silent end}
];
translation("emqx") ->
@ -1118,24 +1126,9 @@ tr_cluster_discovery(Conf) ->
Strategy = conf_get("cluster.discovery_strategy", Conf),
{Strategy, filter(cluster_options(Strategy, Conf))}.
-spec tr_logger_level(hocon:config()) -> logger:level().
tr_logger_level(Conf) ->
emqx_config_logger:tr_level(Conf).
tr_logger_handlers(Conf) ->
emqx_config_logger:tr_handlers(Conf).
log_handler_common_confs(Handler) ->
lists:map(
fun
({_Name, #{importance := _}} = F) -> F;
({Name, Sc}) -> {Name, Sc#{importance => ?IMPORTANCE_LOW}}
end,
do_log_handler_common_confs(Handler)
).
do_log_handler_common_confs(Handler) ->
%% we rarely support dynamic defaults like this
%% for this one, we have build-time defualut the same as runtime default
%% for this one, we have build-time default the same as runtime default
%% so it's less tricky
EnableValues =
case Handler of
@ -1145,21 +1138,31 @@ do_log_handler_common_confs(Handler) ->
EnvValue = os:getenv("EMQX_DEFAULT_LOG_HANDLER"),
Enable = lists:member(EnvValue, EnableValues),
[
{"level",
sc(
log_level(),
#{
default => warning,
desc => ?DESC("common_handler_level"),
importance => ?IMPORTANCE_HIGH
}
)},
{"enable",
sc(
boolean(),
#{
default => Enable,
desc => ?DESC("common_handler_enable"),
importance => ?IMPORTANCE_LOW
importance => ?IMPORTANCE_MEDIUM
}
)},
{"level",
{"formatter",
sc(
log_level(),
hoconsc:enum([text, json]),
#{
default => warning,
desc => ?DESC("common_handler_level")
default => text,
desc => ?DESC("common_handler_formatter"),
importance => ?IMPORTANCE_MEDIUM
}
)},
{"time_offset",
@ -1178,16 +1181,7 @@ do_log_handler_common_confs(Handler) ->
#{
default => unlimited,
desc => ?DESC("common_handler_chars_limit"),
importance => ?IMPORTANCE_LOW
}
)},
{"formatter",
sc(
hoconsc:enum([text, json]),
#{
default => text,
desc => ?DESC("common_handler_formatter"),
importance => ?IMPORTANCE_MEDIUM
importance => ?IMPORTANCE_HIDDEN
}
)},
{"single_line",
@ -1196,7 +1190,7 @@ do_log_handler_common_confs(Handler) ->
#{
default => true,
desc => ?DESC("common_handler_single_line"),
importance => ?IMPORTANCE_LOW
importance => ?IMPORTANCE_HIDDEN
}
)},
{"sync_mode_qlen",
@ -1204,7 +1198,8 @@ do_log_handler_common_confs(Handler) ->
non_neg_integer(),
#{
default => 100,
desc => ?DESC("common_handler_sync_mode_qlen")
desc => ?DESC("common_handler_sync_mode_qlen"),
importance => ?IMPORTANCE_HIDDEN
}
)},
{"drop_mode_qlen",
@ -1212,7 +1207,8 @@ do_log_handler_common_confs(Handler) ->
pos_integer(),
#{
default => 3000,
desc => ?DESC("common_handler_drop_mode_qlen")
desc => ?DESC("common_handler_drop_mode_qlen"),
importance => ?IMPORTANCE_HIDDEN
}
)},
{"flush_qlen",
@ -1220,17 +1216,19 @@ do_log_handler_common_confs(Handler) ->
pos_integer(),
#{
default => 8000,
desc => ?DESC("common_handler_flush_qlen")
desc => ?DESC("common_handler_flush_qlen"),
importance => ?IMPORTANCE_HIDDEN
}
)},
{"overload_kill", sc(?R_REF("log_overload_kill"), #{})},
{"burst_limit", sc(?R_REF("log_burst_limit"), #{})},
{"overload_kill", sc(?R_REF("log_overload_kill"), #{importance => ?IMPORTANCE_HIDDEN})},
{"burst_limit", sc(?R_REF("log_burst_limit"), #{importance => ?IMPORTANCE_HIDDEN})},
{"supervisor_reports",
sc(
hoconsc:enum([error, progress]),
#{
default => error,
desc => ?DESC("common_handler_supervisor_reports")
desc => ?DESC("common_handler_supervisor_reports"),
importance => ?IMPORTANCE_HIDDEN
}
)},
{"max_depth",
@ -1238,7 +1236,8 @@ do_log_handler_common_confs(Handler) ->
hoconsc:union([unlimited, non_neg_integer()]),
#{
default => 100,
desc => ?DESC("common_handler_max_depth")
desc => ?DESC("common_handler_max_depth"),
importance => ?IMPORTANCE_HIDDEN
}
)}
].
@ -1356,6 +1355,22 @@ validator_string_re(Val, RE, Error) ->
node_array() ->
hoconsc:union([emqx_schema:comma_separated_atoms(), hoconsc:array(atom())]).
ensure_file_handlers(Conf, _Opts) ->
FileFields = lists:flatmap(
fun({F, Schema}) ->
Alias = [atom_to_binary(A) || A <- maps:get(aliases, Schema, [])],
[list_to_binary(F) | Alias]
end,
fields("log_file_handler")
),
HandlersWithoutName = maps:with(FileFields, Conf),
HandlersWithName = maps:without(FileFields, Conf),
emqx_utils_maps:deep_merge(#{<<"default">> => HandlersWithoutName}, HandlersWithName).
convert_rotation(undefined, _Opts) -> undefined;
convert_rotation(#{} = Rotation, _Opts) -> maps:get(<<"count">>, Rotation, 10);
convert_rotation(Count, _Opts) when is_integer(Count) -> Count.
ensure_unicode_path(undefined, _) ->
undefined;
ensure_unicode_path(Path, #{make_serializable := true}) ->

View File

@ -48,6 +48,200 @@ array_nodes_test() ->
),
ok.
%% erlfmt-ignore
-define(OUTDATED_LOG_CONF,
"""
log.console_handler {
burst_limit {
enable = true
max_count = 10000
window_time = 1000
}
chars_limit = unlimited
drop_mode_qlen = 3000
enable = true
flush_qlen = 8000
formatter = text
level = warning
max_depth = 100
overload_kill {
enable = true
mem_size = 31457280
qlen = 20000
restart_after = 5000
}
single_line = true
supervisor_reports = error
sync_mode_qlen = 100
time_offset = \"+02:00\"
}
log.file_handlers {
default {
burst_limit {
enable = true
max_count = 10000
window_time = 1000
}
chars_limit = unlimited
drop_mode_qlen = 3000
enable = true
file = \"log/my-emqx.log\"
flush_qlen = 8000
formatter = text
level = debug
max_depth = 100
max_size = \"1024MB\"
overload_kill {
enable = true
mem_size = 31457280
qlen = 20000
restart_after = 5000
}
rotation {count = 20, enable = true}
single_line = true
supervisor_reports = error
sync_mode_qlen = 100
time_offset = \"+01:00\"
}
}
"""
).
-define(FORMATTER(TimeOffset),
{emqx_logger_textfmt, #{
chars_limit => unlimited,
depth => 100,
single_line => true,
template => [time, " [", level, "] ", msg, "\n"],
time_offset => TimeOffset
}}
).
-define(FILTERS, [{drop_progress_reports, {fun logger_filters:progress/2, stop}}]).
-define(LOG_CONFIG, #{
burst_limit_enable => true,
burst_limit_max_count => 10000,
burst_limit_window_time => 1000,
drop_mode_qlen => 3000,
flush_qlen => 8000,
overload_kill_enable => true,
overload_kill_mem_size => 31457280,
overload_kill_qlen => 20000,
overload_kill_restart_after => 5000,
sync_mode_qlen => 100
}).
outdated_log_test() ->
validate_log(?OUTDATED_LOG_CONF).
validate_log(Conf) ->
ensure_acl_conf(),
BaseConf = to_bin(?BASE_CONF, ["emqx1@127.0.0.1", "emqx1@127.0.0.1"]),
Conf0 = <<BaseConf/binary, (list_to_binary(Conf))/binary>>,
{ok, ConfMap0} = hocon:binary(Conf0, #{format => richmap}),
ConfList = hocon_tconf:generate(emqx_conf_schema, ConfMap0),
Kernel = proplists:get_value(kernel, ConfList),
?assertEqual(silent, proplists:get_value(error_logger, Kernel)),
?assertEqual(debug, proplists:get_value(logger_level, Kernel)),
Loggers = proplists:get_value(logger, Kernel),
FileHandler = lists:keyfind(logger_disk_log_h, 3, Loggers),
?assertEqual(
{handler, default, logger_disk_log_h, #{
config => ?LOG_CONFIG#{
type => wrap,
file => "log/my-emqx.log",
max_no_bytes => 1073741824,
max_no_files => 20
},
filesync_repeat_interval => no_repeat,
filters => ?FILTERS,
formatter => ?FORMATTER("+01:00"),
level => debug
}},
FileHandler
),
ConsoleHandler = lists:keyfind(logger_std_h, 3, Loggers),
?assertEqual(
{handler, console, logger_std_h, #{
config => ?LOG_CONFIG#{type => standard_io},
filters => ?FILTERS,
formatter => ?FORMATTER("+02:00"),
level => warning
}},
ConsoleHandler
).
%% erlfmt-ignore
-define(KERNEL_LOG_CONF,
"""
log.console {
enable = true
formatter = text
level = warning
time_offset = \"+02:00\"
}
log.file {
enable = false
file = \"log/xx-emqx.log\"
formatter = text
level = debug
rotation_count = 20
rotation_size = \"1024MB\"
time_offset = \"+01:00\"
}
log.file_handlers.default {
enable = true
file = \"log/my-emqx.log\"
}
"""
).
log_test() ->
validate_log(?KERNEL_LOG_CONF).
%% erlfmt-ignore
log_rotation_count_limit_test() ->
ensure_acl_conf(),
Format =
"""
log.file {
enable = true
to = \"log/emqx.log\"
formatter = text
level = debug
rotation = {count = ~w}
rotation_size = \"1024MB\"
}
""",
BaseConf = to_bin(?BASE_CONF, ["emqx1@127.0.0.1", "emqx1@127.0.0.1"]),
lists:foreach(fun({Conf, Count}) ->
Conf0 = <<BaseConf/binary, Conf/binary>>,
{ok, ConfMap0} = hocon:binary(Conf0, #{format => richmap}),
ConfList = hocon_tconf:generate(emqx_conf_schema, ConfMap0),
Kernel = proplists:get_value(kernel, ConfList),
Loggers = proplists:get_value(logger, Kernel),
?assertMatch(
{handler, default, logger_disk_log_h, #{
config := #{max_no_files := Count}
}},
lists:keyfind(logger_disk_log_h, 3, Loggers)
)
end,
[{to_bin(Format, [1]), 1}, {to_bin(Format, [128]), 128}]),
lists:foreach(fun({Conf, Count}) ->
Conf0 = <<BaseConf/binary, Conf/binary>>,
{ok, ConfMap0} = hocon:binary(Conf0, #{format => richmap}),
?assertThrow({emqx_conf_schema,
[#{kind := validation_error,
mismatches := #{"handler_name" :=
#{kind := validation_error,
path := "log.file.default.rotation_count",
reason := #{expected_type := "1..128"},
value := Count}
}}]},
hocon_tconf:generate(emqx_conf_schema, ConfMap0))
end, [{to_bin(Format, [0]), 0}, {to_bin(Format, [129]), 129}]).
%% erlfmt-ignore
-define(BASE_AUTHN_ARRAY,
"""
@ -86,36 +280,44 @@ authn_validations_test() ->
OKHttps = to_bin(?BASE_AUTHN_ARRAY, [post, true, <<"https://127.0.0.1:8080">>]),
Conf0 = <<BaseConf/binary, OKHttps/binary>>,
{ok, ConfMap0} = hocon:binary(Conf0, #{format => richmap}),
?assert(is_list(hocon_tconf:generate(emqx_conf_schema, ConfMap0))),
{_, Res0} = hocon_tconf:map_translate(emqx_conf_schema, ConfMap0, #{format => richmap}),
Headers0 = authentication_headers(Res0),
?assertEqual(<<"application/json">>, maps:get(<<"content-type">>, Headers0)),
%% accept from converter
?assertEqual(<<"application/json">>, maps:get(<<"accept">>, Headers0)),
OKHttp = to_bin(?BASE_AUTHN_ARRAY, [post, false, <<"http://127.0.0.1:8080">>]),
Conf1 = <<BaseConf/binary, OKHttp/binary>>,
{ok, ConfMap1} = hocon:binary(Conf1, #{format => richmap}),
?assert(is_list(hocon_tconf:generate(emqx_conf_schema, ConfMap1))),
{_, Res1} = hocon_tconf:map_translate(emqx_conf_schema, ConfMap1, #{format => richmap}),
Headers1 = authentication_headers(Res1),
?assertEqual(<<"application/json">>, maps:get(<<"content-type">>, Headers1), Headers1),
?assertEqual(<<"application/json">>, maps:get(<<"accept">>, Headers1), Headers1),
DisableSSLWithHttps = to_bin(?BASE_AUTHN_ARRAY, [post, false, <<"https://127.0.0.1:8080">>]),
Conf2 = <<BaseConf/binary, DisableSSLWithHttps/binary>>,
{ok, ConfMap2} = hocon:binary(Conf2, #{format => richmap}),
?assertThrow(
?ERROR(check_http_ssl_opts),
hocon_tconf:generate(emqx_conf_schema, ConfMap2)
hocon_tconf:map_translate(emqx_conf_schema, ConfMap2, #{format => richmap})
),
BadHeader = to_bin(?BASE_AUTHN_ARRAY, [get, true, <<"https://127.0.0.1:8080">>]),
Conf3 = <<BaseConf/binary, BadHeader/binary>>,
{ok, ConfMap3} = hocon:binary(Conf3, #{format => richmap}),
?assertThrow(
?ERROR(check_http_headers),
hocon_tconf:generate(emqx_conf_schema, ConfMap3)
),
{_, Res3} = hocon_tconf:map_translate(emqx_conf_schema, ConfMap3, #{format => richmap}),
Headers3 = authentication_headers(Res3),
%% remove the content-type header when get method
?assertEqual(false, maps:is_key(<<"content-type">>, Headers3), Headers3),
?assertEqual(<<"application/json">>, maps:get(<<"accept">>, Headers3), Headers3),
BadHeaderWithTuple = binary:replace(BadHeader, [<<"[">>, <<"]">>], <<"">>, [global]),
Conf4 = <<BaseConf/binary, BadHeaderWithTuple/binary>>,
{ok, ConfMap4} = hocon:binary(Conf4, #{format => richmap}),
?assertThrow(
?ERROR(check_http_headers),
hocon_tconf:generate(emqx_conf_schema, ConfMap4)
),
{_, Res4} = hocon_tconf:map_translate(emqx_conf_schema, ConfMap4, #{}),
Headers4 = authentication_headers(Res4),
?assertEqual(false, maps:is_key(<<"content-type">>, Headers4), Headers4),
?assertEqual(<<"application/json">>, maps:get(<<"accept">>, Headers4), Headers4),
ok.
%% erlfmt-ignore
@ -200,6 +402,10 @@ listeners_test() ->
),
ok.
authentication_headers(Conf) ->
[#{<<"headers">> := Headers}] = hocon_maps:get("authentication", Conf),
Headers.
doc_gen_test() ->
ensure_acl_conf(),
%% the json file too large to encode.
@ -238,7 +444,7 @@ log_path_test_() ->
#{<<"log">> => #{<<"file_handlers">> => #{<<"name1">> => #{<<"file">> => Path}}}}
end,
Assert = fun(Name, Path, Conf) ->
?assertMatch(#{log := #{file_handlers := #{Name := #{file := Path}}}}, Conf)
?assertMatch(#{log := #{file := #{Name := #{to := Path}}}}, Conf)
end,
[
@ -251,7 +457,15 @@ log_path_test_() ->
{emqx_conf_schema, [
#{
kind := validation_error,
reason := {"bad_file_path_string", _}
mismatches :=
#{
"handler_name" :=
#{
kind := validation_error,
path := "log.file.name1.to",
reason := {"bad_file_path_string", _}
}
}
}
]},
check(Fh(<<239, 32, 132, 47, 117, 116, 102, 56>>))
@ -262,7 +476,15 @@ log_path_test_() ->
{emqx_conf_schema, [
#{
kind := validation_error,
reason := {"not_string", _}
mismatches :=
#{
"handler_name" :=
#{
kind := validation_error,
path := "log.file.name1.to",
reason := {"not_string", _}
}
}
}
]},
check(Fh(#{<<"foo">> => <<"bar">>}))

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_connector, [
{description, "EMQX Data Integration Connectors"},
{vsn, "0.1.23"},
{vsn, "0.1.24"},
{registered, []},
{mod, {emqx_connector_app, []}},
{applications, [

View File

@ -32,22 +32,17 @@
on_query/3,
on_query_async/4,
on_get_status/2,
reply_delegator/2
reply_delegator/3
]).
-type url() :: emqx_http_lib:uri_map().
-reflect_type([url/0]).
-typerefl_from_string({url/0, emqx_http_lib, uri_parse}).
-export([
roots/0,
fields/1,
desc/1,
validations/0,
namespace/0
]).
-export([check_ssl_opts/2, validate_method/1, join_paths/2]).
-export([validate_method/1, join_paths/2]).
-type connect_timeout() :: emqx_schema:duration() | infinity.
-type pool_type() :: random | hash.
@ -70,20 +65,6 @@ roots() ->
fields(config) ->
[
{base_url,
sc(
url(),
#{
required => true,
validator => fun
(#{query := _Query}) ->
{error, "There must be no query in the base_url"};
(_) ->
ok
end,
desc => ?DESC("base_url")
}
)},
{connect_timeout,
sc(
emqx_schema:duration_ms(),
@ -172,9 +153,6 @@ desc("request") ->
desc(_) ->
undefined.
validations() ->
[{check_ssl_opts, fun check_ssl_opts/1}].
validate_method(M) when M =:= <<"post">>; M =:= <<"put">>; M =:= <<"get">>; M =:= <<"delete">> ->
ok;
validate_method(M) ->
@ -268,10 +246,11 @@ on_query(InstId, {send_message, Msg}, State) ->
request_timeout := Timeout
} = process_request(Request, Msg),
%% bridge buffer worker has retry, do not let ehttpc retry
Retry = 0,
Retry = 2,
ClientId = maps:get(clientid, Msg, undefined),
on_query(
InstId,
{undefined, Method, {Path, Headers, Body}, Timeout, Retry},
{ClientId, Method, {Path, Headers, Body}, Timeout, Retry},
State
)
end;
@ -371,9 +350,10 @@ on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) ->
headers := Headers,
request_timeout := Timeout
} = process_request(Request, Msg),
ClientId = maps:get(clientid, Msg, undefined),
on_query_async(
InstId,
{undefined, Method, {Path, Headers, Body}, Timeout},
{ClientId, Method, {Path, Headers, Body}, Timeout},
ReplyFunAndArgs,
State
)
@ -395,12 +375,22 @@ on_query_async(
}
),
NRequest = formalize_request(Method, BasePath, Request),
MaxAttempts = maps:get(max_attempts, State, 3),
Context = #{
attempt => 1,
max_attempts => MaxAttempts,
state => State,
key_or_num => KeyOrNum,
method => Method,
request => NRequest,
timeout => Timeout
},
ok = ehttpc:request_async(
Worker,
Method,
NRequest,
Timeout,
{fun ?MODULE:reply_delegator/2, [ReplyFunAndArgs]}
{fun ?MODULE:reply_delegator/3, [Context, ReplyFunAndArgs]}
),
{ok, Worker}.
@ -582,18 +572,6 @@ make_method(M) when M == <<"PUT">>; M == <<"put">> -> put;
make_method(M) when M == <<"GET">>; M == <<"get">> -> get;
make_method(M) when M == <<"DELETE">>; M == <<"delete">> -> delete.
check_ssl_opts(Conf) ->
check_ssl_opts("base_url", Conf).
check_ssl_opts(URLFrom, Conf) ->
#{scheme := Scheme} = hocon_maps:get(URLFrom, Conf),
SSL = hocon_maps:get("ssl", Conf),
case {Scheme, maps:get(enable, SSL, false)} of
{http, false} -> true;
{https, true} -> true;
{_, _} -> false
end.
formalize_request(Method, BasePath, {Path, Headers, _Body}) when
Method =:= get; Method =:= delete
->
@ -636,7 +614,10 @@ to_bin(Str) when is_list(Str) ->
to_bin(Atom) when is_atom(Atom) ->
atom_to_binary(Atom, utf8).
reply_delegator(ReplyFunAndArgs, Result) ->
reply_delegator(Context, ReplyFunAndArgs, Result) ->
spawn(fun() -> maybe_retry(Result, Context, ReplyFunAndArgs) end).
transform_result(Result) ->
case Result of
%% The normal reason happens when the HTTP connection times out before
%% the request has been fully processed
@ -647,16 +628,47 @@ reply_delegator(ReplyFunAndArgs, Result) ->
Reason =:= {shutdown, normal};
Reason =:= {shutdown, closed}
->
Result1 = {error, {recoverable_error, Reason}},
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result1);
{error, {recoverable_error, Reason}};
{error, {closed, _Message} = Reason} ->
%% _Message = "The connection was lost."
Result1 = {error, {recoverable_error, Reason}},
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result1);
{error, {recoverable_error, Reason}};
_ ->
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result)
Result
end.
maybe_retry(Result0, _Context = #{attempt := N, max_attempts := Max}, ReplyFunAndArgs) when
N >= Max
->
Result = transform_result(Result0),
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result);
maybe_retry({error, Reason}, Context, ReplyFunAndArgs) ->
#{
state := State,
attempt := Attempt,
key_or_num := KeyOrNum,
method := Method,
request := Request,
timeout := Timeout
} = Context,
%% TODO: reset the expiration time for free retries?
IsFreeRetry = Reason =:= normal orelse Reason =:= {shutdown, normal},
NContext =
case IsFreeRetry of
true -> Context;
false -> Context#{attempt := Attempt + 1}
end,
Worker = resolve_pool_worker(State, KeyOrNum),
ok = ehttpc:request_async(
Worker,
Method,
Request,
Timeout,
{fun ?MODULE:reply_delegator/3, [NContext, ReplyFunAndArgs]}
),
ok;
maybe_retry(Result, _Context, ReplyFunAndArgs) ->
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result).
%% The HOCON schema system may generate sensitive keys with this format
is_sensitive_key([{str, StringKey}]) ->
is_sensitive_key(StringKey);

View File

@ -29,6 +29,10 @@
-compile(nowarn_export_all).
-compile(export_all).
-type url() :: emqx_http_lib:uri_map().
-reflect_type([url/0]).
-typerefl_from_string({url/0, emqx_http_lib, uri_parse}).
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
@ -314,7 +318,7 @@ t_sub_fields(_Config) ->
ok.
t_complicated_type(_Config) ->
Path = "/ref/complicated_type",
Path = "/ref/complex_type",
Object = #{
<<"content">> => #{
<<"application/json">> =>
@ -633,14 +637,14 @@ schema("/error") ->
}
}
};
schema("/ref/complicated_type") ->
schema("/ref/complex_type") ->
#{
operationId => test,
post => #{
responses => #{
200 => [
{no_neg_integer, hoconsc:mk(non_neg_integer(), #{})},
{url, hoconsc:mk(emqx_connector_http:url(), #{})},
{url, hoconsc:mk(url(), #{})},
{server, hoconsc:mk(emqx_schema:ip_port(), #{})},
{connect_timeout, hoconsc:mk(emqx_connector_http:connect_timeout(), #{})},
{pool_type, hoconsc:mk(emqx_connector_http:pool_type(), #{})},

Some files were not shown because too many files have changed in this diff Show More