Merge remote-tracking branch 'origin/main-v4.3' into main-v4.4

This commit is contained in:
Ivan Dyachkov 2022-09-26 20:19:23 +02:00
commit 77a45f86cc
25 changed files with 1490 additions and 233 deletions

View File

@ -1,5 +1,5 @@
EMQX_AUTH__LDAP__SERVERS=ldap_server
EMQX_AUTH__MONGO__SERVER=mongo_server:27017
EMQX_AUTH__MONGO__SERVER=toxiproxy:27017
EMQX_AUTH__MYSQL__SERVER=mysql_server:3306
EMQX_AUTH__MYSQL__USERNAME=root
EMQX_AUTH__MYSQL__PASSWORD=public

View File

@ -0,0 +1,16 @@
version: '3.9'
services:
toxiproxy:
container_name: toxiproxy
image: ghcr.io/shopify/toxiproxy:2.5.0
restart: always
networks:
- emqx_bridge
volumes:
- "./toxiproxy.json:/config/toxiproxy.json"
ports:
- 8474:8474
command:
- "-host=0.0.0.0"
- "-config=/config/toxiproxy.json"

View File

@ -0,0 +1,8 @@
[
{
"name": "mongo",
"listen": "0.0.0.0:27017",
"upstream": "mongo:27017",
"enabled": true
}
]

View File

@ -0,0 +1,26 @@
name: 'Detect profiles'
inputs:
ci_git_token:
required: true
type: string
outputs:
profiles:
description: 'Detected profiles'
value: ${{ steps.detect-profiles.outputs.profiles}}
runs:
using: composite
steps:
- id: detect-profiles
shell: bash
run: |
git config --global --add safe.directory "$GITHUB_WORKSPACE"
if make emqx-ee --dry-run > /dev/null 2>&1; then
echo "::set-output name=profiles::[\"emqx-ee\"]"
echo "https://ci%40emqx.io:${{ inputs.ci_git_token }}@github.com" > $HOME/.git-credentials
git config --global credential.helper store
echo "EMQX_NAME=emqx-ee" >> $GITHUB_ENV
else
echo "::set-output name=profiles::[\"emqx\", \"emqx-edge\"]"
echo "EMQX_NAME=emqx" >> $GITHUB_ENV
fi

View File

@ -0,0 +1,95 @@
name: 'Create MacOS package'
inputs:
profile: # emqx, emqx-enterprise
required: true
type: string
otp: # 24.2.1-1, 23.3.4.9-3
required: true
type: string
os:
required: false
type: string
default: macos-11
apple_id_password:
required: true
type: string
apple_developer_identity:
required: true
type: string
apple_developer_id_bundle:
required: true
type: string
apple_developer_id_bundle_password:
required: true
type: string
runs:
using: composite
steps:
- name: prepare
shell: bash
run: |
brew update
brew install curl zip unzip gnu-sed kerl coreutils unixodbc freetds openssl@1.1
echo "/usr/local/opt/bison/bin" >> $GITHUB_PATH
echo "/usr/local/bin" >> $GITHUB_PATH
- uses: actions/cache@v2
id: cache
with:
path: ~/.kerl/${{ inputs.otp }}
key: otp-install-${{ inputs.otp }}-${{ inputs.os }}-static-ssl-disable-hipe-disable-jit
- name: build erlang
if: steps.cache.outputs.cache-hit != 'true'
shell: bash
env:
KERL_BUILD_BACKEND: git
OTP_GITHUB_URL: https://github.com/emqx/otp
KERL_CONFIGURE_OPTIONS: --disable-dynamic-ssl-lib --with-ssl=/usr/local/opt/openssl@1.1 --disable-hipe --disable-jit
run: |
kerl update releases
kerl build ${{ inputs.otp }}
kerl install ${{ inputs.otp }} $HOME/.kerl/${{ inputs.otp }}
- name: build ${{ inputs.profile }}
env:
AUTO_INSTALL_BUILD_DEPS: 1
APPLE_SIGN_BINARIES: 1
APPLE_ID: developers@emqx.io
APPLE_TEAM_ID: 26N6HYJLZA
APPLE_ID_PASSWORD: ${{ inputs.apple_id_password }}
APPLE_DEVELOPER_IDENTITY: ${{ inputs.apple_developer_identity }}
APPLE_DEVELOPER_ID_BUNDLE: ${{ inputs.apple_developer_id_bundle }}
APPLE_DEVELOPER_ID_BUNDLE_PASSWORD: ${{ inputs.apple_developer_id_bundle_password }}
shell: bash
run: |
. $HOME/.kerl/${{ inputs.otp }}/activate
make ensure-rebar3
sudo cp rebar3 /usr/local/bin/rebar3
make ${{ inputs.profile }}-zip
- name: test ${{ inputs.profile }}
shell: bash
run: |
pkg_name=$(basename _packages/${{ inputs.profile }}/${{ inputs.profile }}-*.zip)
unzip -q _packages/${{ inputs.profile }}/$pkg_name
gsed -i '/emqx_telemetry/d' ./emqx/data/loaded_plugins
./emqx/bin/emqx start || cat emqx/log/erlang.log.1
ready='no'
for i in {1..10}; do
if curl -fs 127.0.0.1:18083 > /dev/null; then
ready='yes'
break
fi
sleep 1
done
if [ "$ready" != "yes" ]; then
echo "Timed out waiting for emqx to be ready"
cat emqx/log/erlang.log.1
exit 1
fi
./emqx/bin/emqx_ctl status
if ! ./emqx/bin/emqx stop; then
cat emqx/log/erlang.log.1 || true
cat emqx/log/emqx.log.1 || true
echo "failed to stop emqx"
exit 1
fi
rm -rf emqx

View File

@ -23,24 +23,18 @@ jobs:
# prepare source with any OTP version, no need for a matrix
container: ghcr.io/emqx/emqx-builder/4.4-19:24.1.5-3-ubuntu20.04
outputs:
profiles: ${{ steps.set_profile.outputs.profiles}}
profiles: ${{ steps.detect-profiles.outputs.profiles}}
steps:
- uses: actions/checkout@v2
with:
path: source
fetch-depth: 0
- name: set profile
id: set_profile
shell: bash
- id: detect-profiles
working-directory: source
run: |
git config --global --add safe.directory "$GITHUB_WORKSPACE"
if make emqx-ee --dry-run > /dev/null 2>&1; then
echo "::set-output name=profiles::[\"emqx-ee\"]"
else
echo "::set-output name=profiles::[\"emqx\", \"emqx-edge\"]"
fi
uses: ./.github/actions/detect-profiles
with:
ci_git_token: ${{ secrets.CI_GIT_TOKEN }}
- name: get_all_deps
if: endsWith(github.repository, 'emqx')
run: |
@ -117,89 +111,35 @@ jobs:
profile: ${{fromJSON(needs.prepare.outputs.profiles)}}
otp:
- 24.1.5-3
macos:
- macos-11
exclude:
- profile: emqx-edge
runs-on: ${{ matrix.macos }}
os:
- macos-11
runs-on: ${{ matrix.os }}
steps:
- uses: actions/download-artifact@v2
with:
name: source
path: .
- name: unzip source code
run: unzip -q source.zip
- name: prepare
run: |
brew update
brew install curl zip unzip gnu-sed kerl unixodbc freetds
echo "/usr/local/bin" >> $GITHUB_PATH
git config --global credential.helper store
- uses: actions/cache@v2
id: cache
ln -s . source
unzip -q source.zip
rm source source.zip
- uses: ./.github/actions/package-macos
with:
path: ~/.kerl/${{ matrix.otp }}
key: otp-install-${{ matrix.otp }}-${{ matrix.macos }}-static-ssl-disable-hipe-disable-jit
- name: build erlang
if: steps.cache.outputs.cache-hit != 'true'
timeout-minutes: 60
env:
KERL_BUILD_BACKEND: git
OTP_GITHUB_URL: https://github.com/emqx/otp
KERL_CONFIGURE_OPTIONS: --disable-dynamic-ssl-lib --with-ssl=/usr/local/opt/openssl@1.1 --disable-hipe --disable-jit
run: |
kerl update releases
kerl build ${{ matrix.otp }}
kerl install ${{ matrix.otp }} $HOME/.kerl/${{ matrix.otp }}
- name: build
env:
APPLE_SIGN_BINARIES: 1
APPLE_ID: developers@emqx.io
APPLE_TEAM_ID: 26N6HYJLZA
APPLE_ID_PASSWORD: ${{ secrets.APPLE_ID_PASSWORD }}
APPLE_DEVELOPER_IDENTITY: ${{ secrets.APPLE_DEVELOPER_IDENTITY }}
APPLE_DEVELOPER_ID_BUNDLE: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE }}
APPLE_DEVELOPER_ID_BUNDLE_PASSWORD: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE_PASSWORD }}
working-directory: source
run: |
. $HOME/.kerl/${{ matrix.otp }}/activate
make ensure-rebar3
sudo cp rebar3 /usr/local/bin/rebar3
rm -rf _build/${{ matrix.profile }}/lib
make ${{ matrix.profile }}-zip
- name: test
working-directory: source
run: |
set -x
pkg_name=$(find _packages/${{ matrix.profile }} -mindepth 1 -maxdepth 1 -iname \*.zip)
unzip -q $pkg_name
gsed -i '/emqx_telemetry/d' ./emqx/data/loaded_plugins
./emqx/bin/emqx start || cat emqx/log/erlang.log.1
ready='no'
for i in {1..10}; do
if curl -fs 127.0.0.1:18083 > /dev/null; then
ready='yes'
break
fi
sleep 1
done
if [ "$ready" != "yes" ]; then
echo "Timed out waiting for emqx to be ready"
cat emqx/log/erlang.log.1
exit 1
fi
./emqx/bin/emqx_ctl status
if ! ./emqx/bin/emqx stop; then
cat emqx/log/erlang.log.1 || true
cat emqx/log/emqx.log.1 || true
echo "failed to stop emqx"
exit 1
fi
rm -rf emqx
profile: ${{ matrix.profile }}
otp: ${{ matrix.otp }}
os: ${{ matrix.os }}
apple_id_password: ${{ secrets.APPLE_ID_PASSWORD }}
apple_developer_identity: ${{ secrets.APPLE_DEVELOPER_IDENTITY }}
apple_developer_id_bundle: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE }}
apple_developer_id_bundle_password: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE_PASSWORD }}
- uses: actions/upload-artifact@v1
with:
name: ${{ matrix.profile }}-${{ matrix.otp }}
path: source/_packages/${{ matrix.profile }}/.
path: _packages/${{ matrix.profile }}/.
linux:
runs-on: ubuntu-20.04

View File

@ -16,7 +16,7 @@ jobs:
strategy:
fail-fast: false
matrix:
erl_otp:
otp:
- 24.1.5-3
os:
- ubuntu20.04
@ -32,27 +32,20 @@ jobs:
- runs-on: aws-amd64
use-self-hosted: false
container: ghcr.io/emqx/emqx-builder/4.4-19:${{ matrix.erl_otp }}-${{ matrix.os }}
container: ghcr.io/emqx/emqx-builder/4.4-19:${{ matrix.otp }}-${{ matrix.os }}
steps:
- uses: actions/checkout@v1
- name: prepare
run: |
git config --global --add safe.directory "$GITHUB_WORKSPACE"
if make emqx-ee --dry-run > /dev/null 2>&1; then
echo "https://ci%40emqx.io:${{ secrets.CI_GIT_TOKEN }}@github.com" > $HOME/.git-credentials
git config --global credential.helper store
echo "EMQX_NAME=emqx-ee" >> $GITHUB_ENV
else
echo "EMQX_NAME=emqx" >> $GITHUB_ENV
fi
- uses: ./.github/actions/detect-profiles
with:
ci_git_token: ${{ secrets.CI_GIT_TOKEN }}
- name: fix-git-unsafe-repository
run: git config --global --add safe.directory /__w/emqx/emqx
- uses: actions/cache@v2
with:
# dialyzer PLTs
path: ~/.cache/rebar3/
key: dialyer-${{ matrix.erl_otp }}
key: dialyer-${{ matrix.otp }}
- name: make xref
run: make xref
- name: make dialyzer
@ -117,65 +110,27 @@ jobs:
strategy:
fail-fast: false
matrix:
profile:
- emqx
otp:
- 24.1.5-3
macos:
- macos-11
runs-on: ${{ matrix.macos }}
- 24.1.5-3
os:
- macos-11
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v1
- name: prepare
run: |
if make emqx-ee --dry-run > /dev/null 2>&1; then
echo "https://ci%40emqx.io:${{ secrets.CI_GIT_TOKEN }}@github.com" > $HOME/.git-credentials
git config --global credential.helper store
echo "EMQX_NAME=emqx-ee" >> $GITHUB_ENV
else
echo "EMQX_NAME=emqx" >> $GITHUB_ENV
fi
- name: prepare
run: |
brew update
brew install curl zip unzip gnu-sed kerl unixodbc freetds
echo "/usr/local/bin" >> $GITHUB_PATH
git config --global credential.helper store
- uses: actions/cache@v2
id: cache
- uses: ./.github/actions/detect-profiles
with:
path: ~/.kerl/${{ matrix.otp }}
key: otp-install-${{ matrix.otp }}-${{ matrix.macos }}-static-ssl-disable-hipe-disable-jit
- name: build erlang
if: steps.cache.outputs.cache-hit != 'true'
timeout-minutes: 60
env:
KERL_BUILD_BACKEND: git
OTP_GITHUB_URL: https://github.com/emqx/otp
KERL_CONFIGURE_OPTIONS: --disable-dynamic-ssl-lib --with-ssl=/usr/local/opt/openssl@1.1 --disable-hipe --disable-jit
run: |
kerl update releases
kerl build ${{ matrix.otp }}
kerl install ${{ matrix.otp }} $HOME/.kerl/${{ matrix.otp }}
- name: build
env:
APPLE_SIGN_BINARIES: 1
APPLE_ID: developers@emqx.io
APPLE_TEAM_ID: 26N6HYJLZA
APPLE_ID_PASSWORD: ${{ secrets.APPLE_ID_PASSWORD }}
APPLE_DEVELOPER_IDENTITY: ${{ secrets.APPLE_DEVELOPER_IDENTITY }}
APPLE_DEVELOPER_ID_BUNDLE: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE }}
APPLE_DEVELOPER_ID_BUNDLE_PASSWORD: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE_PASSWORD }}
run: |
. $HOME/.kerl/${{ matrix.otp }}/activate
make ensure-rebar3
sudo cp rebar3 /usr/local/bin/rebar3
make ${EMQX_NAME}-zip
- uses: actions/upload-artifact@v1
if: failure()
ci_git_token: ${{ secrets.CI_GIT_TOKEN }}
- uses: ./.github/actions/package-macos
with:
name: rebar3.crashdump
path: ./rebar3.crashdump
profile: ${{ matrix.profile }}
otp: ${{ matrix.otp }}
os: ${{ matrix.os }}
apple_id_password: ${{ secrets.APPLE_ID_PASSWORD }}
apple_developer_identity: ${{ secrets.APPLE_DEVELOPER_IDENTITY }}
apple_developer_id_bundle: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE }}
apple_developer_id_bundle_password: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE_PASSWORD }}
- name: test
run: |
pkg_name=$(find _packages/${EMQX_NAME} -mindepth 1 -maxdepth 1 -iname \*.zip)

View File

@ -10,24 +10,18 @@ jobs:
container: ghcr.io/emqx/emqx-builder/4.4-19:24.1.5-3-ubuntu20.04
outputs:
profiles: ${{ steps.set_profile.outputs.profiles}}
s3dir: ${{ steps.set_profile.outputs.s3dir}}
profiles: ${{ steps.detect-profiles.outputs.profiles}}
steps:
- uses: actions/checkout@v2
with:
path: source
fetch-depth: 0
- name: set profile
id: set_profile
shell: bash
run: |
cd source
if make emqx-ee --dry-run > /dev/null 2>&1; then
echo "::set-output name=profiles::[\"emqx-ee\"]"
else
echo "::set-output name=profiles::[\"emqx\", \"emqx-edge\"]"
fi
- id: detect-profiles
working-directory: source
uses: ./.github/actions/detect-profiles
with:
ci_git_token: ${{ secrets.CI_GIT_TOKEN }}
upload:
runs-on: ubuntu-20.04

View File

@ -82,6 +82,7 @@ jobs:
- name: docker-compose up
run: |
docker-compose \
-f .ci/docker-compose-file/docker-compose-toxiproxy.yaml \
-f .ci/docker-compose-file/docker-compose-mongo-${{ matrix.connect_type }}.yaml \
-f .ci/docker-compose-file/docker-compose.yaml \
up -d --build
@ -107,11 +108,11 @@ jobs:
- name: setup
if: matrix.network_type == 'ipv4'
run: |
echo "EMQX_AUTH__MONGO__SERVER=$(docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' mongo):27017" >> "$GITHUB_ENV"
echo "EMQX_AUTH__MONGO__SERVER=$(docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' toxiproxy):27017" >> "$GITHUB_ENV"
- name: setup
if: matrix.network_type == 'ipv6'
run: |
echo "EMQX_AUTH__MONGO__SERVER=$(docker inspect -f '{{range .NetworkSettings.Networks}}{{.GlobalIPv6Address}}{{end}}' mongo):27017" >> "$GITHUB_ENV"
echo "EMQX_AUTH__MONGO__SERVER=$(docker inspect -f '{{range .NetworkSettings.Networks}}{{.GlobalIPv6Address}}{{end}}' toxiproxy):27017" >> "$GITHUB_ENV"
- name: set git token
run: |
if make emqx-ee --dry-run > /dev/null 2>&1; then

View File

@ -6,7 +6,7 @@ on:
- v*
- e*
branches:
- 'main-v4.?'
- 'main-v4.[0-9]?'
pull_request:
jobs:
@ -54,6 +54,7 @@ jobs:
run: |
docker-compose \
-f .ci/docker-compose-file/docker-compose.yaml \
-f .ci/docker-compose-file/docker-compose-toxiproxy.yaml \
-f .ci/docker-compose-file/docker-compose-ldap-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-mongo-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-mysql-tcp.yaml \
@ -81,6 +82,7 @@ jobs:
run: |
docker-compose \
-f .ci/docker-compose-file/docker-compose.yaml \
-f .ci/docker-compose-file/docker-compose-toxiproxy.yaml \
-f .ci/docker-compose-file/docker-compose-ldap-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-mongo-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-mysql-tcp.yaml \

View File

@ -9,22 +9,35 @@ File format:
- Use weight-2 heading for releases
- One list item per change topic
Change log ends with a list of GitHub PRs
## v4.3.22
## v4.3.21
### Enhancements
- TLS listener memory usage optimization
new option 'hibernate_after' to hibernate TLS process after idling
- TLS listener default buffer size to 4KB
Eliminate uncertainty that the buffer size is set by OS default
- TLS listener memory usage optimization [#9005](https://github.com/emqx/emqx/pull/9005).
New config `listener.ssl.$NAME.hibernate_after` to hibernate TLS connection process after idling.
Hibernation can reduce RAM usage significantly, but may cost more CPU.
This configuration is by default disabled.
Our preliminary test shows a 50% of RAM usage decline when configured to '5s'.
- TLS listener default buffer size to 4KB [#9007](https://github.com/emqx/emqx/pull/9007)
Eliminate uncertainty that the buffer size is set by OS default.
- Fix delayed publish inaccurate caused by os time change. [#8908](https://github.com/emqx/emqx/pull/8908)
- Disable authorization for `api/v4/emqx_prometheus` endpoint. [8955](https://github.com/emqx/emqx/pull/8955)
- Added a test to prevent a last will testament message to be
published when a client is denied connection. [#8894](https://github.com/emqx/emqx/pull/8894)
## v4.3.20
### Bug fixes
- Fix rule-engine update behaviour which may initialize actions for disabled rules. [#8849](https://github.com/emqx/emqx/pull/8849)
- Fix JWT plugin don't support non-integer timestamp claims. [#8862](https://github.com/emqx/emqx/pull/8862)
- Fix delayed publish inaccurate caused by os time change. [#8908](https://github.com/emqx/emqx/pull/8908)
- Fix a possible dead loop caused by shared subscriptions with `shared_dispatch_ack_enabled=true`. [#8918](https://github.com/emqx/emqx/pull/8918)
- Fix dashboard binding IP address not working. [#8916](https://github.com/emqx/emqx/pull/8916)
- Fix rule SQL topic matching to null values failed. [#8927](https://github.com/emqx/emqx/pull/8927)
@ -32,9 +45,6 @@ File format:
`SELECT topic =~ 't' as r FROM "$events/client_connected"`.
The topic is a null value as there's no such field in event `$events/client_connected`, so it
should return false if match it to a topic.
- Added a test to prevent a last will testament message to be
published when a client is denied connection. [#8894](https://github.com/emqx/emqx/pull/8894)
- Disable authorization for `api/v4/emqx_prometheus` endpoint. [8955](https://github.com/emqx/emqx/pull/8955)
## v4.3.19

View File

@ -408,7 +408,7 @@ t_password_hash(_) ->
ok = application:start(emqx_auth_mnesia).
t_will_message_connection_denied(Config) when is_list(Config) ->
ClientId = Username = <<"subscriber">>,
ClientId = <<"subscriber">>,
Password = <<"p">>,
application:stop(emqx_auth_mnesia),
ok = emqx_ct_helpers:start_apps([emqx_auth_mnesia]),

View File

@ -79,4 +79,3 @@ feedvar(Str, Var, Val) ->
re:replace(Str, Var, Val, [global, {return, binary}]).
description() -> "ACL with MongoDB".

View File

@ -1,6 +1,6 @@
{application, emqx_auth_mongo,
[{description, "EMQ X Authentication/ACL with MongoDB"},
{vsn, "4.4.4"}, % strict semver, bump manually!
{vsn, "4.4.5"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_auth_mongo_sup]},
{applications, [kernel,stdlib,mongodb,ecpool]},

View File

@ -1,7 +1,8 @@
%% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{<<"4\\.4\\.[2-3]">>,
[{"4.4.4",[{load_module,emqx_auth_mongo,brutal_purge,soft_purge,[]}]},
{<<"4\\.4\\.[2-3]">>,
[{load_module,emqx_auth_mongo_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_mongo,brutal_purge,soft_purge,[]}]},
{"4.4.1",
@ -14,7 +15,8 @@
{load_module,emqx_auth_mongo,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_mongo,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{<<"4\\.4\\.[2-3]">>,
[{"4.4.4",[{load_module,emqx_auth_mongo,brutal_purge,soft_purge,[]}]},
{<<"4\\.4\\.[2-3]">>,
[{load_module,emqx_auth_mongo_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_mongo,brutal_purge,soft_purge,[]}]},
{"4.4.1",

View File

@ -22,6 +22,7 @@
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/types.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-export([ check/3
, description/0
@ -38,14 +39,22 @@
, available/3
]).
-ifdef(TEST).
-export([ is_superuser/3
, available/4
]).
-endif.
check(ClientInfo = #{password := Password}, AuthResult,
Env = #{authquery := AuthQuery, superquery := SuperQuery}) ->
?tp(emqx_auth_mongo_superuser_check_authn_enter, #{}),
#authquery{collection = Collection, field = Fields,
hash = HashType, selector = Selector} = AuthQuery,
Pool = maps:get(pool, Env, ?APP),
case query(Pool, Collection, maps:from_list(replvars(Selector, ClientInfo))) of
undefined -> ok;
{error, Reason} ->
?tp(emqx_auth_mongo_check_authn_error, #{error => Reason}),
?LOG(error, "[MongoDB] Can't connect to MongoDB server: ~0p", [Reason]),
{stop, AuthResult#{auth_result => not_authorized, anonymous => false}};
UserMap ->
@ -58,6 +67,7 @@ check(ClientInfo = #{password := Password}, AuthResult,
end,
case Result of
ok ->
?tp(emqx_auth_mongo_superuser_check_authn_ok, #{}),
{stop, AuthResult#{is_superuser => is_superuser(Pool, SuperQuery, ClientInfo),
anonymous => false,
auth_result => success}};
@ -81,17 +91,24 @@ description() -> "Authentication with MongoDB".
is_superuser(_Pool, undefined, _ClientInfo) ->
false;
is_superuser(Pool, #superquery{collection = Coll, field = Field, selector = Selector}, ClientInfo) ->
case query(Pool, Coll, maps:from_list(replvars(Selector, ClientInfo))) of
undefined -> false;
{error, Reason} ->
?LOG(error, "[MongoDB] Can't connect to MongoDB server: ~0p", [Reason]),
false;
Row ->
case maps:get(Field, Row, false) of
true -> true;
_False -> false
end
end.
?tp(emqx_auth_mongo_superuser_query_enter, #{}),
Res =
case query(Pool, Coll, maps:from_list(replvars(Selector, ClientInfo))) of
undefined ->
%% returned when there are no returned documents
false;
{error, Reason} ->
?tp(emqx_auth_mongo_superuser_query_error, #{error => Reason}),
?LOG(error, "[MongoDB] Can't connect to MongoDB server: ~0p", [Reason]),
false;
Row ->
case maps:get(Field, Row, false) of
true -> true;
_False -> false
end
end,
?tp(emqx_auth_mongo_superuser_query_result, #{is_superuser => Res}),
Res.
%%--------------------------------------------------------------------
%% Availability Test
@ -114,6 +131,7 @@ available(Pool, Collection, Query) ->
available(Pool, Collection, Query, Fun) ->
try Fun(Pool, Collection, Query) of
{error, Reason} ->
?tp(emqx_auth_mongo_available_error, #{error => Reason}),
?LOG(error, "[MongoDB] ~p availability test error: ~0p", [Collection, Reason]),
{error, Reason};
Error = #{<<"code">> := Code} ->
@ -144,7 +162,16 @@ test_client_info() ->
%%--------------------------------------------------------------------
replvars(VarList, ClientInfo) ->
lists:map(fun(Var) -> replvar(Var, ClientInfo) end, VarList).
lists:foldl(
fun(Var, Selector) ->
case replvar(Var, ClientInfo) of
%% assumes that all fields are binaries...
{unmatchable, Field} -> [{Field, []} | Selector];
Interpolated -> [Interpolated | Selector]
end
end,
[],
VarList).
replvar({Field, <<"%u">>}, #{username := Username}) ->
{Field, Username};
@ -154,8 +181,8 @@ replvar({Field, <<"%C">>}, #{cn := CN}) ->
{Field, CN};
replvar({Field, <<"%d">>}, #{dn := DN}) ->
{Field, DN};
replvar(Selector, _ClientInfo) ->
Selector.
replvar({Field, _PlaceHolder}, _ClientInfo) ->
{unmatchable, Field}.
%%--------------------------------------------------------------------
%% MongoDB Connect/Query
@ -169,19 +196,57 @@ connect(Opts) ->
mongo_api:connect(Type, Hosts, Options, WorkerOptions).
query(Pool, Collection, Selector) ->
ecpool:with_client(Pool, fun(Conn) -> mongo_api:find_one(Conn, Collection, Selector, #{}) end).
Timeout = timer:seconds(15),
with_timeout(Timeout, fun() ->
ecpool:with_client(Pool, fun(Conn) -> mongo_api:find_one(Conn, Collection, Selector, #{}) end)
end).
query_multi(Pool, Collection, SelectorList) ->
?tp(emqx_auth_mongo_query_multi_enter, #{}),
Timeout = timer:seconds(45),
lists:reverse(lists:flatten(lists:foldl(fun(Selector, Acc1) ->
Batch = ecpool:with_client(Pool, fun(Conn) ->
case mongo_api:find(Conn, Collection, Selector, #{}) of
{error, Reason} ->
?LOG(error, "[MongoDB] query_multi failed, got error: ~p", [Reason]),
[];
[] -> [];
{ok, Cursor} ->
mc_cursor:foldl(fun(O, Acc2) -> [O|Acc2] end, [], Cursor, 1000)
end
end),
[Batch|Acc1]
Res =
with_timeout(Timeout, fun() ->
ecpool:with_client(Pool, fun(Conn) ->
?tp(emqx_auth_mongo_query_multi_find_selector, #{}),
case find(Conn, Collection, Selector) of
{error, Reason} ->
?tp(emqx_auth_mongo_query_multi_error,
#{error => Reason}),
?LOG(error, "[MongoDB] query_multi failed, got error: ~p", [Reason]),
[];
[] ->
?tp(emqx_auth_mongo_query_multi_no_results, #{}),
[];
{ok, Cursor} ->
mc_cursor:foldl(fun(O, Acc2) -> [O | Acc2] end, [], Cursor, 1000)
end
end)
end),
case Res of
{error, timeout} ->
?tp(emqx_auth_mongo_query_multi_error, #{error => timeout}),
?LOG(error, "[MongoDB] query_multi timeout", []),
Acc1;
Batch ->
[Batch | Acc1]
end
end, [], SelectorList))).
find(Conn, Collection, Selector) ->
try
mongo_api:find(Conn, Collection, Selector, #{})
catch
K:E:S ->
{error, {K, E, S}}
end.
with_timeout(Timeout, Fun) ->
try
emqx_misc:nolink_apply(Fun, Timeout)
catch
exit:timeout ->
{error, timeout};
K:E:S ->
erlang:raise(K, E, S)
end.

View File

@ -30,6 +30,10 @@
, stop/1
]).
-ifdef(TEST).
-export([with_env/2]).
-endif.
%%--------------------------------------------------------------------
%% Application callbacks
%%--------------------------------------------------------------------

View File

@ -19,42 +19,98 @@
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx_auth_mongo.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-define(APP, emqx_auth_mongo).
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(POOL(App), ecpool_worker:client(gproc_pool:pick_worker({ecpool, App}))).
-define(MONGO_CL_ACL, <<"mqtt_acl">>).
-define(MONGO_CL_USER, <<"mqtt_user">>).
-define(INIT_ACL, [{<<"username">>, <<"testuser">>, <<"clientid">>, <<"null">>, <<"subscribe">>, [<<"#">>]},
{<<"username">>, <<"dashboard">>, <<"clientid">>, <<"null">>, <<"pubsub">>, [<<"$SYS/#">>]},
{<<"username">>, <<"user3">>, <<"clientid">>, <<"null">>, <<"publish">>, [<<"a/b/c">>]}]).
-define(INIT_ACL, [ { <<"username">>, <<"testuser">>
, <<"clientid">>, <<"null">>
, <<"subscribe">>, [<<"#">>]
}
, { <<"username">>, <<"dashboard">>
, <<"clientid">>, <<"null">>
, <<"pubsub">>, [<<"$SYS/#">>]
}
, { <<"username">>, <<"user3">>
, <<"clientid">>, <<"null">>
, <<"publish">>, [<<"a/b/c">>]
}
]).
-define(INIT_AUTH, [{<<"username">>, <<"plain">>, <<"password">>, <<"plain">>, <<"salt">>, <<"salt">>, <<"is_superuser">>, true},
{<<"username">>, <<"md5">>, <<"password">>, <<"1bc29b36f623ba82aaf6724fd3b16718">>, <<"salt">>, <<"salt">>, <<"is_superuser">>, false},
{<<"username">>, <<"sha">>, <<"password">>, <<"d8f4590320e1343a915b6394170650a8f35d6926">>, <<"salt">>, <<"salt">>, <<"is_superuser">>, false},
{<<"username">>, <<"sha256">>, <<"password">>, <<"5d5b09f6dcb2d53a5fffc60c4ac0d55fabdf556069d6631545f42aa6e3500f2e">>, <<"salt">>, <<"salt">>, <<"is_superuser">>, false},
{<<"username">>, <<"pbkdf2_password">>, <<"password">>, <<"cdedb5281bb2f801565a1122b2563515">>, <<"salt">>, <<"ATHENA.MIT.EDUraeburn">>, <<"is_superuser">>, false},
{<<"username">>, <<"bcrypt_foo">>, <<"password">>, <<"$2a$12$sSS8Eg.ovVzaHzi1nUHYK.HbUIOdlQI0iS22Q5rd5z.JVVYH6sfm6">>, <<"salt">>, <<"$2a$12$sSS8Eg.ovVzaHzi1nUHYK.">>, <<"is_superuser">>, false}
]).
-define(INIT_AUTH, [ { <<"username">>, <<"plain">>
, <<"password">>, <<"plain">>
, <<"salt">>, <<"salt">>
, <<"is_superuser">>, true
}
, { <<"username">>, <<"md5">>
, <<"password">>, <<"1bc29b36f623ba82aaf6724fd3b16718">>
, <<"salt">>, <<"salt">>
, <<"is_superuser">>, false
}
, { <<"username">>, <<"sha">>
, <<"password">>, <<"d8f4590320e1343a915b6394170650a8f35d6926">>
, <<"salt">>, <<"salt">>
, <<"is_superuser">>, false
}
, { <<"username">>, <<"sha256">>
, <<"password">>, <<"5d5b09f6dcb2d53a5fffc60c4ac0d55fabdf556069d6631545f42aa6e3500f2e">>
, <<"salt">>, <<"salt">>
, <<"is_superuser">>, false
}
, { <<"username">>, <<"pbkdf2_password">>
, <<"password">>, <<"cdedb5281bb2f801565a1122b2563515">>
, <<"salt">>, <<"ATHENA.MIT.EDUraeburn">>
, <<"is_superuser">>, false
}
, { <<"username">>, <<"bcrypt_foo">>
, <<"password">>, <<"$2a$12$sSS8Eg.ovVzaHzi1nUHYK.HbUIOdlQI0iS22Q5rd5z.JVVYH6sfm6">>
, <<"salt">>, <<"$2a$12$sSS8Eg.ovVzaHzi1nUHYK.">>
, <<"is_superuser">>, false
}
, { <<"username">>, <<"user_full">>
, <<"clientid">>, <<"client_full">>
, <<"common_name">>, <<"cn_full">>
, <<"distinguished_name">>, <<"dn_full">>
, <<"password">>, <<"plain">>
, <<"salt">>, <<"salt">>
, <<"is_superuser">>, false
}
]).
%%--------------------------------------------------------------------
%% Setups
%%--------------------------------------------------------------------
all() ->
emqx_ct:all(?MODULE).
OtherTCs = emqx_ct:all(?MODULE) -- resilience_tests(),
[ {group, resilience}
| OtherTCs].
init_per_suite(Cfg) ->
resilience_tests() ->
[ t_acl_superuser_timeout
, t_available_acl_query_no_connection
, t_available_acl_query_timeout
, t_available_authn_query_timeout
, t_authn_timeout
, t_available
].
groups() ->
[ {resilience, resilience_tests()}
].
init_per_suite(Config) ->
emqx_ct_helpers:start_apps([emqx_auth_mongo], fun set_special_confs/1),
init_mongo_data(),
%% avoid inter-suite flakiness
ok = emqx_mod_acl_internal:unload([]),
Cfg.
Config.
end_per_suite(_Cfg) ->
deinit_mongo_data(),
@ -69,6 +125,81 @@ set_special_confs(emqx) ->
set_special_confs(_App) ->
ok.
init_per_group(resilience, Config) ->
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
ProxyPortStr = os:getenv("PROXY_PORT", "8474"),
ProxyPort = list_to_integer(ProxyPortStr),
reset_proxy(ProxyHost, ProxyPort),
ProxyServer = ProxyHost ++ ":27017",
{ok, OriginalServer} = application:get_env(emqx_auth_mongo, server),
OriginalServerMap = maps:from_list(OriginalServer),
NewServerMap = OriginalServerMap#{hosts => [ProxyServer]},
NewServer = maps:to_list(NewServerMap),
emqx_ct_helpers:stop_apps([emqx_auth_mongo]),
Handler =
fun(App = emqx_auth_mongo) ->
application:set_env(emqx_auth_mongo, server, NewServer),
set_special_confs(App);
(App)->
set_special_confs(App)
end,
emqx_ct_helpers:start_apps([emqx_auth_mongo], Handler),
[ {original_server, OriginalServer}
, {proxy_host, ProxyHost}
, {proxy_port, ProxyPort}
| Config];
init_per_group(_Group, Config) ->
Config.
end_per_group(resilience, Config) ->
OriginalServer = ?config(original_server, Config),
application:set_env(emqx_auth_mongo, server, OriginalServer),
emqx_ct_helpers:stop_apps([emqx_auth_mongo]),
emqx_ct_helpers:start_apps([emqx_auth_mongo], fun set_special_confs/1),
ok;
end_per_group(_Group, _Config) ->
ok.
init_per_testcase(t_authn_full_selector_variables, Config) ->
{ok, AuthQuery} = application:get_env(emqx_auth_mongo, auth_query),
OriginalSelector = proplists:get_value(selector, AuthQuery),
Selector = [ {<<"username">>, <<"%u">>}
, {<<"clientid">>, <<"%c">>}
, {<<"common_name">>, <<"%C">>}
, {<<"distinguished_name">>, <<"%d">>}
],
reload({auth_query, [{selector, Selector}]}),
init_mongo_data(),
[ {original_selector, OriginalSelector}
, {selector, Selector}
| Config];
init_per_testcase(_TestCase, Config) ->
init_mongo_data(),
Config.
end_per_testcase(t_authn_full_selector_variables, Config) ->
OriginalSelector = ?config(original_selector, Config),
reload({auth_query, [{selector, OriginalSelector}]}),
deinit_mongo_data(),
ok;
end_per_testcase(TestCase, Config)
when TestCase =:= t_available_acl_query_timeout;
TestCase =:= t_acl_superuser_timeout;
TestCase =:= t_authn_no_connection;
TestCase =:= t_available_acl_query_no_connection ->
ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config),
reset_proxy(ProxyHost, ProxyPort),
%% force restart of clients because CI tends to get stuck...
application:stop(emqx_auth_mongo),
application:start(emqx_auth_mongo),
wait_for_stabilization(#{attempts => 10, interval_ms => 500}),
deinit_mongo_data(),
ok;
end_per_testcase(_TestCase, _Config) ->
deinit_mongo_data(),
ok.
init_mongo_data() ->
%% Users
{ok, Connection} = ?POOL(?APP),
@ -87,6 +218,14 @@ deinit_mongo_data() ->
%% Test cases
%%--------------------------------------------------------------------
%% for full coverage ;-)
t_authn_description(_Config) ->
?assert(is_list(emqx_auth_mongo:description())).
%% for full coverage ;-)
t_acl_description(_Config) ->
?assert(is_list(emqx_acl_mongo:description())).
t_check_auth(_) ->
Plain = #{zone => external, clientid => <<"client1">>, username => <<"plain">>},
Plain1 = #{zone => external, clientid => <<"client1">>, username => <<"plain2">>},
@ -116,7 +255,124 @@ t_check_auth(_) ->
{ok, #{is_superuser := false}} = emqx_access_control:authenticate(Pbkdf2#{password => <<"password">>}),
reload({auth_query, [{password_hash, {salt, bcrypt}}]}),
{ok, #{is_superuser := false}} = emqx_access_control:authenticate(Bcrypt#{password => <<"foo">>}),
{error, _} = emqx_access_control:authenticate(User1#{password => <<"foo">>}).
{error, _} = emqx_access_control:authenticate(User1#{password => <<"foo">>}),
%% bad field config
reload({auth_query, [{password_field, [<<"bad_field">>]}]}),
?assertEqual({error, password_error},
emqx_access_control:authenticate(Plain#{password => <<"plain">>})),
%% unknown username
Unknown = #{zone => unknown, clientid => <<"?">>, username => <<"?">>, password => <<"">>},
?assertEqual({error, not_authorized}, emqx_access_control:authenticate(Unknown)),
ok.
t_authn_full_selector_variables(Config) ->
Selector = ?config(selector, Config),
ClientInfo = #{ zone => external
, clientid => <<"client_full">>
, username => <<"user_full">>
, cn => <<"cn_full">>
, dn => <<"dn_full">>
, password => <<"plain">>
},
?assertMatch({ok, _}, emqx_access_control:authenticate(ClientInfo)),
EnvFields = [ clientid
, username
, cn
, dn
],
lists:foreach(
fun(Field) ->
UnauthorizedClientInfo = ClientInfo#{Field => <<"wrong">>},
?assertEqual({error, not_authorized},
emqx_access_control:authenticate(UnauthorizedClientInfo),
#{ field => Field
, client_info => UnauthorizedClientInfo
, selector => Selector
})
end,
EnvFields),
ok.
t_authn_interpolation_no_info(_Config) ->
Valid = #{zone => external, clientid => <<"client1">>,
username => <<"plain">>, password => <<"plain">>},
?assertMatch({ok, _}, emqx_access_control:authenticate(Valid)),
try
%% has values that are equal to placeholders
InterpolationUser = #{ <<"username">> => <<"%u">>
, <<"password">> => <<"plain">>
, <<"salt">> => <<"salt">>
, <<"is_superuser">> => true
},
{ok, Conn} = ?POOL(?APP),
{{true, _}, _} = mongo_api:insert(Conn, ?MONGO_CL_USER, InterpolationUser),
Invalid = maps:without([username], Valid),
?assertMatch({error, not_authorized}, emqx_access_control:authenticate(Invalid))
after
deinit_mongo_data(),
init_mongo_data()
end.
%% authenticates, but superquery returns no documents
t_authn_empty_is_superuser_collection(_Config) ->
{ok, SuperQuery} = application:get_env(emqx_auth_mongo, super_query),
Collection = list_to_binary(proplists:get_value(collection, SuperQuery)),
reload({auth_query, [{password_hash, plain}]}),
Plain = #{zone => external, clientid => <<"client1">>,
username => <<"plain">>, password => <<"plain">>},
ok = snabbkaffe:start_trace(),
?force_ordering(
#{?snk_kind := emqx_auth_mongo_superuser_check_authn_ok},
#{?snk_kind := truncate_coll_enter}),
?force_ordering(
#{?snk_kind := truncate_coll_done},
#{?snk_kind := emqx_auth_mongo_superuser_query_enter}),
try
spawn_link(fun() ->
?tp(truncate_coll_enter, #{}),
{ok, Conn} = ?POOL(?APP),
{true, _} = mongo_api:delete(Conn, Collection, _Selector = #{}),
?tp(truncate_coll_done, #{})
end),
?assertMatch({ok, #{is_superuser := false}}, emqx_access_control:authenticate(Plain)),
ok = snabbkaffe:stop(),
ok
after
init_mongo_data()
end.
t_available(Config) ->
ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config),
Pool = ?APP,
SuperQuery = #superquery{collection = SuperCollection} = superquery(),
%% success;
?assertEqual(ok, emqx_auth_mongo:available(Pool, SuperQuery)),
%% error with code;
EmptySelector = #{},
?assertEqual(
{error, {mongo_error, 2}},
emqx_auth_mongo:available(Pool, SuperCollection, EmptySelector, fun error_code_query/3)),
%% exception (in query)
?assertMatch(
{error, _},
with_failure(down, ProxyHost, ProxyPort,
fun() ->
Collection = <<"mqtt_user">>,
Selector = #{},
emqx_auth_mongo:available(Pool, Collection, Selector)
end)),
%% exception (arbitrary function)
?assertMatch(
{error, _},
with_failure(down, ProxyHost, ProxyPort,
fun() ->
Collection = <<"mqtt_user">>,
Selector = #{},
RaisingFun = fun(_, _, _) -> error(some_error) end,
emqx_auth_mongo:available(Pool, Collection, Selector, RaisingFun)
end)),
ok.
t_check_acl(_) ->
{ok, Connection} = ?POOL(?APP),
@ -132,7 +388,30 @@ t_check_acl(_) ->
allow = emqx_access_control:check_acl(User2, subscribe, <<"$SYS/testuser/1">>),
allow = emqx_access_control:check_acl(User3, publish, <<"a/b/c">>),
deny = emqx_access_control:check_acl(User3, publish, <<"c">>),
deny = emqx_access_control:check_acl(User4, publish, <<"a/b/c">>).
deny = emqx_access_control:check_acl(User4, publish, <<"a/b/c">>),
%% undefined value to interpolate
User1Undef = User1#{clientid => undefined},
allow = emqx_access_control:check_acl(User1Undef, subscribe, <<"users/testuser/1">>),
ok.
t_acl_empty_results(_Config) ->
#aclquery{selector = Selector} = aclquery(),
User1 = #{zone => external, clientid => <<"client1">>, username => <<"testuser">>},
try
reload({acl_query, [{selector, []}]}),
?assertEqual(deny, emqx_access_control:check_acl(User1, subscribe, <<"users/testuser/1">>)),
ok
after
reload({acl_query, [{selector, Selector}]})
end,
ok.
t_acl_exception(_Config) ->
%% FIXME: is there a more authentic way to produce an exception in
%% `match'???
User1 = #{zone => external, clientid => not_a_binary, username => <<"testuser">>},
?assertEqual(deny, emqx_access_control:check_acl(User1, subscribe, <<"users/testuser/1">>)),
ok.
t_acl_super(_) ->
reload({auth_query, [{password_hash, plain}, {password_field, [<<"password">>]}]}),
@ -155,10 +434,175 @@ t_acl_super(_) ->
end,
emqtt:disconnect(C).
%% apparently, if the config is undefined in `emqx_auth_mongo_app:r',
%% this is allowed...
t_is_superuser_undefined(_Config) ->
Pool = ClientInfo = unused_in_this_case,
SuperQuery = undefined,
?assertNot(emqx_auth_mongo:is_superuser(Pool, SuperQuery, ClientInfo)),
ok.
t_authn_timeout(Config) ->
ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config),
FailureType = timeout,
{ok, C} = emqtt:start_link([{clientid, <<"simpleClient">>},
{username, <<"plain">>},
{password, <<"plain">>}]),
unlink(C),
?check_trace(
try
enable_failure(FailureType, ProxyHost, ProxyPort),
{error, {unauthorized_client, _}} = emqtt:connect(C),
ok
after
heal_failure(FailureType, ProxyHost, ProxyPort)
end,
fun(Trace) ->
%% fails with `{exit,{{{badmatch,{{error,closed},...'
?assertMatch([_], ?of_kind(emqx_auth_mongo_check_authn_error, Trace)),
ok
end),
ok.
%% tests query timeout failure
t_available_authn_query_timeout(Config) ->
ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config),
FailureType = timeout,
SuperQuery = superquery(),
?check_trace(
#{timetrap => timer:seconds(60)},
try
enable_failure(FailureType, ProxyHost, ProxyPort),
Pool = ?APP,
%% query_multi returns an empty list even on failures.
?assertEqual({error, timeout}, emqx_auth_mongo:available(Pool, SuperQuery)),
ok
after
heal_failure(FailureType, ProxyHost, ProxyPort)
end,
fun(Trace) ->
?assertMatch(
[#{?snk_kind := emqx_auth_mongo_available_error , error := _}],
?of_kind(emqx_auth_mongo_available_error, Trace))
end),
ok.
%% tests query_multi failure
t_available_acl_query_no_connection(Config) ->
test_acl_query_failure(down, Config).
%% ensure query_multi has a timeout
t_available_acl_query_timeout(Config) ->
ct:timetrap(90000),
test_acl_query_failure(timeout, Config).
%% checks that `with_timeout' lets unknown errors pass through
t_query_multi_unknown_exception(_Config) ->
ok = meck:new(ecpool, [no_link, no_history, non_strict, passthrough]),
ok = meck:expect(ecpool, with_client, fun(_, _) -> throw(some_error) end),
Pool = ?APP,
Collection = ?MONGO_CL_ACL,
SelectorList = [#{<<"username">> => <<"user">>}],
try
?assertThrow(some_error, emqx_auth_mongo:query_multi(Pool, Collection, SelectorList))
after
meck:unload(ecpool)
end.
t_acl_superuser_timeout(Config) ->
ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config),
FailureType = timeout,
reload({auth_query, [{password_hash, plain}, {password_field, [<<"password">>]}]}),
{ok, C} = emqtt:start_link([{clientid, <<"simpleClient">>},
{username, <<"plain">>},
{password, <<"plain">>}]),
unlink(C),
?check_trace(
try
?force_ordering(
#{?snk_kind := emqx_auth_mongo_superuser_check_authn_ok},
#{?snk_kind := connection_will_cut}
),
?force_ordering(
#{?snk_kind := connection_cut},
#{?snk_kind := emqx_auth_mongo_superuser_query_enter}
),
spawn(fun() ->
?tp(connection_will_cut, #{}),
enable_failure(FailureType, ProxyHost, ProxyPort),
?tp(connection_cut, #{})
end),
{ok, _} = emqtt:connect(C),
ok = emqtt:disconnect(C),
ok
after
heal_failure(FailureType, ProxyHost, ProxyPort)
end,
fun(Trace) ->
?assertMatch(
[ #{ ?snk_kind := emqx_auth_mongo_superuser_query_error
, error := _
}
, #{ ?snk_kind := emqx_auth_mongo_superuser_query_result
, is_superuser := false
}
],
?of_kind([ emqx_auth_mongo_superuser_query_error
, emqx_auth_mongo_superuser_query_result
], Trace))
end),
ok.
%%--------------------------------------------------------------------
%% Utils
%%--------------------------------------------------------------------
test_acl_query_failure(FailureType, Config) ->
ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config),
ACLQuery = aclquery(),
?check_trace(
#{timetrap => timer:seconds(60)},
try
?force_ordering(
#{?snk_kind := emqx_auth_mongo_query_multi_enter},
#{?snk_kind := connection_will_cut}
),
?force_ordering(
#{?snk_kind := connection_cut},
#{?snk_kind := emqx_auth_mongo_query_multi_find_selector}
),
spawn(fun() ->
?tp(connection_will_cut, #{}),
enable_failure(FailureType, ProxyHost, ProxyPort),
?tp(connection_cut, #{})
end),
Pool = ?APP,
%% query_multi returns an empty list even on failures.
?assertMatch(ok, emqx_auth_mongo:available(Pool, ACLQuery)),
ok
after
heal_failure(FailureType, ProxyHost, ProxyPort)
end,
fun(Trace) ->
?assertMatch(
[#{?snk_kind := emqx_auth_mongo_query_multi_error , error := _}],
?of_kind(emqx_auth_mongo_query_multi_error, Trace))
end),
ok.
reload({Par, Vals}) when is_list(Vals) ->
application:stop(?APP),
{ok, TupleVals} = application:get_env(?APP, Par),
@ -171,3 +615,105 @@ reload({Par, Vals}) when is_list(Vals) ->
end, TupleVals),
application:set_env(?APP, Par, lists:append(NewVals, Vals)),
application:start(?APP).
superquery() ->
emqx_auth_mongo_app:with_env(super_query, fun(SQ) -> SQ end).
aclquery() ->
emqx_auth_mongo_app:with_env(acl_query, fun(SQ) -> SQ end).
%% TODO: any easier way to make mongo return a map with an error code???
error_code_query(Pool, Collection, Selector) ->
%% should be a query; this is to provoke an error return from
%% mongo.
WrongLimit = {},
ecpool:with_client(
Pool,
fun(Conn) ->
mongoc:transaction_query(
Conn,
fun(Conf = #{pool := Worker}) ->
Query = mongoc:count_query(Conf, Collection, Selector, WrongLimit),
{_, Res} = mc_worker_api:command(Worker, Query),
Res
end)
end).
wait_for_stabilization(#{attempts := Attempts, interval_ms := IntervalMS})
when Attempts > 0 ->
try
{ok, Conn} = ?POOL(?APP),
#{} = mongo_api:find_one(Conn, ?MONGO_CL_USER, #{}, #{}),
ok
catch
_:_ ->
ct:pal("mongodb connection still stabilizing... sleeping for ~b ms", [IntervalMS]),
ct:sleep(IntervalMS),
wait_for_stabilization(#{attempts => Attempts - 1, interval_ms => IntervalMS})
end;
wait_for_stabilization(_) ->
error(mongo_connection_did_not_stabilize).
%% TODO: move to ct helpers???
reset_proxy(ProxyHost, ProxyPort) ->
Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/reset",
Body = <<>>,
{ok, {{_, 204, _}, _, _}} = httpc:request(post, {Url, [], "application/json", Body}, [],
[{body_format, binary}]).
with_failure(FailureType, ProxyHost, ProxyPort, Fun) ->
enable_failure(FailureType, ProxyHost, ProxyPort),
try
Fun()
after
heal_failure(FailureType, ProxyHost, ProxyPort)
end.
enable_failure(FailureType, ProxyHost, ProxyPort) ->
case FailureType of
down -> switch_proxy(off, ProxyHost, ProxyPort);
timeout -> timeout_proxy(on, ProxyHost, ProxyPort);
latency_up -> latency_up_proxy(on, ProxyHost, ProxyPort)
end.
heal_failure(FailureType, ProxyHost, ProxyPort) ->
case FailureType of
down -> switch_proxy(on, ProxyHost, ProxyPort);
timeout -> timeout_proxy(off, ProxyHost, ProxyPort);
latency_up -> latency_up_proxy(off, ProxyHost, ProxyPort)
end.
switch_proxy(Switch, ProxyHost, ProxyPort) ->
Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/mongo",
Body = case Switch of
off -> <<"{\"enabled\":false}">>;
on -> <<"{\"enabled\":true}">>
end,
{ok, {{_, 200, _}, _, _}} = httpc:request(post, {Url, [], "application/json", Body}, [],
[{body_format, binary}]).
timeout_proxy(on, ProxyHost, ProxyPort) ->
Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/mongo/toxics",
Body = <<"{\"name\":\"timeout\",\"type\":\"timeout\","
"\"stream\":\"upstream\",\"toxicity\":1.0,"
"\"attributes\":{\"timeout\":0}}">>,
{ok, {{_, 200, _}, _, _}} = httpc:request(post, {Url, [], "application/json", Body}, [],
[{body_format, binary}]);
timeout_proxy(off, ProxyHost, ProxyPort) ->
Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/mongo/toxics/timeout",
Body = <<>>,
{ok, {{_, 204, _}, _, _}} = httpc:request(delete, {Url, [], "application/json", Body}, [],
[{body_format, binary}]).
latency_up_proxy(on, ProxyHost, ProxyPort) ->
Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/mongo/toxics",
Body = <<"{\"name\":\"latency_up\",\"type\":\"latency\","
"\"stream\":\"upstream\",\"toxicity\":1.0,"
"\"attributes\":{\"latency\":20000,\"jitter\":3000}}">>,
{ok, {{_, 200, _}, _, _}} = httpc:request(post, {Url, [], "application/json", Body}, [],
[{body_format, binary}]);
latency_up_proxy(off, ProxyHost, ProxyPort) ->
Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/mongo/toxics/latency_up",
Body = <<>>,
{ok, {{_, 204, _}, _, _}} = httpc:request(delete, {Url, [], "application/json", Body}, [],
[{body_format, binary}]).

View File

@ -59,7 +59,7 @@
, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}
, {observer_cli, "1.6.1"} % NOTE: depends on recon 2.5.1
, {getopt, "1.0.1"}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.15.0"}}}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.1"}}}
, {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.1"}}}
, {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.13"}}}
, {epgsql, {git, "https://github.com/emqx/epgsql.git", {tag, "4.6.0"}}}

238
scripts/rel/cut4x.sh Executable file
View File

@ -0,0 +1,238 @@
#!/usr/bin/env bash
## cut a new 4.x release for EMQX (opensource or enterprise).
set -euo pipefail
# ensure dir
cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/../.."
usage() {
cat <<EOF
$0 RELEASE_GIT_TAG [option]
RELEASE_GIT_TAG is a 'v*' or 'e*' tag for example:
v4.3.21
e4.4.10-alpha.2
options:
-h|--help: Print this usage.
-b|--base: Specify the current release base branch, can be one of
release-v43, release-v44, release-e43, or release-e44.
NOTE: this option should be used when --dryrun.
--dryrun: Do not actually create the git tag.
--skip-appup: Skip checking appup
Useful when you are sure that appup is already updated'
NOTE: When cutting a 'e*' tag release, both the opensource and enterprise
repos should be found as a fetch-remote in the current git repo.
NOTE: For 4.3 series the current working branch must be 'release-v43' for opensource edition
and 'release-e43' for enterprise edition.
--.--[main-v4.3]------------------.-----------.---
\\ / \\
\`---[release-v43]----(v4.3.X) \\
\\ \\
.---[release-e43]-------------'--(e4.3.Y) \\
/ \\ V
--'------[main-v4.3-enterprise]---------------'----'---
The same applies to 4.4 series, however, a 4.3 branch is also the upstream branch
for the corresponding 4.4 branch. e.g. release-e44 has two upstreams: release-v44 and release-e43
EOF
}
logerr() {
echo -e "\e[31mERROR: $1\e[39m"
}
logmsg() {
echo -e "\e[33mINFO: $1\e[39m"
}
REL_BRANCH_CE="${REL_BRANCH_CE:-release-v43}"
REL_BRANCH_EE="${REL_BRANCH_EE:-release-e43}"
TAG="${1:-}"
case "$TAG" in
v*)
if [ -f EMQX_ENTERPRISE ]; then
logerr 'Cannot create v-tag on enterprise branch'
exit 1
fi
TAG_PREFIX='v'
APPUP_CHECK_PROFILE='emqx'
;;
e*)
if [ ! -f EMQX_ENTERPRISE ]; then
logerr 'Cannot create e-tag on opensource branch'
exit 1
fi
TAG_PREFIX='e'
APPUP_CHECK_PROFILE='emqx-ee'
;;
-h|--help)
usage
exit 0
;;
*)
logerr "Unknown version tag $TAG"
usage
exit 1
;;
esac
shift 1
SKIP_APPUP='no'
DRYRUN='no'
while [ "$#" -gt 0 ]; do
case $1 in
-h|--help)
usage
exit 0
;;
--skip-appup)
shift
SKIP_APPUP='yes'
;;
--dryrun)
shift
DRYRUN='yes'
;;
-b|--base)
BASE_BR="${2:-}"
if [ -z "${BASE_BR}" ]; then
logerr "Must specify which base branch"
exit 1
fi
shift 2
;;
*)
logerr "Unknown option $1"
exit 1
;;
esac
done
rel_branch() {
local tag="$1"
case "$tag" in
v4.3.*)
echo 'release-v43'
;;
e4.3.*)
echo 'release-e43'
;;
v4.4.*)
echo 'release-v44'
;;
e4.4.*)
echo 'release-e44'
;;
*)
logerr "Unsupported version tag $TAG"
exit 1
;;
esac
}
## Ensure the current work branch
assert_work_branch() {
local tag="$1"
local release_branch
release_branch="$(rel_branch "$tag")"
local base_branch
base_branch="${BASE_BR:-$(git branch --show-current)}"
if [ "$base_branch" != "$release_branch" ]; then
logerr "Base branch: $base_branch"
logerr "Relase tag must be on the release branch: $release_branch"
logerr "or must use -b|--base option to specify which release branch is current branch based on"
exit 1
fi
}
assert_work_branch "$TAG"
## Ensure no dirty changes
assert_not_dirty() {
local diff
diff="$(git diff --name-only)"
if [ -n "$diff" ]; then
logerr "Git status is not clean? Changed files:"
logerr "$diff"
exit 1
fi
}
assert_not_dirty
## Assert that the tag is not already created
assert_tag_absent() {
local tag="$1"
## Fail if the tag already exists
EXISTING="$(git tag --list "$tag")"
if [ -n "$EXISTING" ]; then
logerr "$tag already released?"
logerr 'This script refuse to force re-tag.'
logerr 'If re-tag is intended, you must first delete the tag from both local and remote'
exit 1
fi
}
assert_tag_absent "$TAG"
PKG_VSN=$(./pkg-vsn.sh)
## Assert package version is updated to the tag which is being created
assert_release_version() {
local tag="$1"
# shellcheck disable=SC2001
pkg_vsn="$(echo "$PKG_VSN" | sed 's/-[0-9a-f]\{8\}$//g')"
if [ "${TAG_PREFIX}${pkg_vsn}" != "${tag}" ]; then
logerr "The release version ($pkg_vsn) is different from the desired git tag."
logerr "Update the release version in emqx_release.hrl"
exit 1
fi
}
assert_release_version "$TAG"
## Check if all upstream branches are merged
if [ -z "${BASE_BR:-}" ]; then
./scripts/rel/sync-remotes.sh
else
./scripts/rel/sync-remotes.sh --base "$BASE_BR"
fi
## Check if the Chart versions are in sync
./scripts/check-chart-vsn.sh
## Check if app versions are bumped
./scripts/check-apps-vsn.sh
## Ensure appup files are updated
if [ "$SKIP_APPUP" = 'no' ]; then
logmsg "Checking appups"
./scripts/update-appup.sh "$APPUP_CHECK_PROFILE" --check
else
logmsg "Skipped checking appup updates"
fi
## Ensure relup paths are updated
case "${PKG_VSN}" in
4.3.*)
HAS_RELUP_DB='no'
;;
*)
HAS_RELUP_DB='yes'
;;
esac
if [ "$HAS_RELUP_DB" = 'yes' ]; then
if [ -f EMQX_ENTERPRISE ]; then
RELUP_PATHS='./data/relup-paths-ee.eterm'
else
RELUP_PATHS='./data/relup-paths.eterm'
fi
./scripts/relup-base-vsns.escript check-vsn-db "$PKG_VSN" "$RELUP_PATHS"
fi
if [ "$DRYRUN" = 'yes' ]; then
logmsg "Release tag is ready to be created with command: git tag $TAG"
else
git tag "$TAG"
logmsg "$TAG is created OK."
fi

201
scripts/rel/sync-remotes.sh Executable file
View File

@ -0,0 +1,201 @@
#!/usr/bin/env bash
set -euo pipefail
# ensure dir
cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/../.."
CE_BASES=( 'release-v43' 'release-v44' 'main-v4.3' 'main-v4.4' )
EE_BASES=( 'release-e43' 'release-e44' 'main-v4.3-enterprise' 'main-v4.4-enterprise' )
usage() {
cat <<EOF
$0 [option]
options:
-h|--help:
This script works on one of the branches listed in the -b|--base option below.
It tries to merge (by default with --ff-only option)
upstreams branches for the current working branch.
The uppstream branch of the current branch are as below:
* v43: [] # no upstream for 4.3 opensource edition
* e43: [v43] # 4.3 enterprise has 4.3 opensource as upstream
* v44: [v43] # 4.4 opensource has 4.3 opensource as upstream
* e44: [e43, v44, v43] # 4.4 enterprise has all the above 3 as upstream
For e44:
v43 is an indirect upstream.
Ideally the merge of v43 should be done through v44 or e43,
but it does not hurt to apply a direct merge.
-b|--base:
The base branch of current working branch if currently is not
on one of the following branches.
${CE_BASES[@]}
${EE_BASES[@]}
-i|--interactive:
With this option, the script will try to merge upstream
branches to local working branch interactively.
That is, there will be git prompts to edit commit messages etc.
Without this option, the script executes 'git merge' command
with '--ff-only' option which conveniently pulls remote
updates if there is any, and fails when fast-forward is not possible
EOF
}
logerr() {
echo -e "\e[31mERROR: $1\e[39m"
}
logwarn() {
echo -e "\e[33mINFO: $1\e[39m"
}
logmsg() {
echo "INFO: $1"
}
INTERACTIVE='no'
while [ "$#" -gt 0 ]; do
case $1 in
-h|--help)
usage
exit 0
;;
-i|--interactive)
shift
INTERACTIVE='yes'
;;
-b|--base)
shift
BASE_BRANCH="$1"
shift
;;
*)
logerr "Unknown option $1"
exit 1
;;
esac
done
if [ -f EMQX_ENTERPRISE ]; then
IS_ENTERPRISE='yes'
BASE_BRANCHES=( "${EE_BASES[@]}" )
else
IS_ENTERPRISE='no'
BASE_BRANCHES=( "${CE_BASES[@]}" )
fi
CURRENT_BRANCH="$(git branch --show-current)"
BASE_BRANCH="${BASE_BRANCH:-${CURRENT_BRANCH}}"
## check if arg1 is one of the elements in arg2-N
is_element() {
local e match="$1"
shift
for e in "${@}"; do
if [ "$e" = "$match" ]; then
return 0
fi
done
return 1
}
if ! is_element "$BASE_BRANCH" "${BASE_BRANCHES[@]}"; then
logerr "Cannot work with branch $BASE_BRANCH"
logerr "The base branch must be one of: ${BASE_BRANCHES[*]}"
logerr "Change work branch to one of the above."
logerr "OR: use -b|--base to specify from which base branch is current working branch created"
exit 1
fi
## Find git remotes to fetch from.
##
## NOTE: For enterprise, the opensource repo must be added as a remote.
## Because not all changes in opensource repo are synced to enterprise repo immediately.
##
## NOTE: grep -v enterprise here, but why not to match on full repo name 'emqx/emqx.git'?
## It's because the git remote does not always end with .git
GIT_REMOTE_CE="$(git remote -v | grep 'emqx/emqx' | grep -v enterprise | grep fetch | head -1 | awk '{print $1}' || true)"
if [ -z "$GIT_REMOTE_CE" ]; then
logerr "Cannot find git remote for emqx/emqx"
exit 1
fi
REMOTES=( "${GIT_REMOTE_CE}" )
if [ "$IS_ENTERPRISE" = 'yes' ]; then
GIT_REMOTE_EE="$(git remote -v | grep 'emqx/emqx-enterprise' | grep fetch | head -1 | awk '{print $1}' || true)"
if [ -z "$GIT_REMOTE_EE" ]; then
logerr "Cannot find git remote for emqx/emqx-enterprise"
exit 1
fi
REMOTES+=( "$GIT_REMOTE_EE" )
fi
## Fetch the remotes
for remote in "${REMOTES[@]}"; do
logwarn "Fetching from remote=${remote} (force tag sync)."
git fetch "$remote" --tags --force
done
logmsg 'Fetched all remotes'
if [ "$INTERACTIVE" = 'yes' ]; then
MERGE_OPTS=''
else
## Using --ff-only to *check* if the remote is already merged
## Also conveniently merged it in case it's *not* merged but can be fast-forwarded
## Alternative is to check with 'git merge-base'
MERGE_OPTS='--ff-only'
fi
## Get the git remote reference of the given 'release-' or 'main-' branch
remote_ref() {
local branch="$1"
if is_element "$branch" "${EE_BASES[@]}"; then
echo -n "${GIT_REMOTE_EE}/${branch} "
else
echo -n "${GIT_REMOTE_CE}/${branch} "
fi
}
remote_refs() {
local br
for br in "${@}"; do
remote_ref "$br"
done
}
## Get upstream branches of the given branch
upstream_branches() {
local base="$1"
case "$base" in
release-v43|main-v4.3)
## no upstream for 4.3 opensource
remote_ref "$base"
;;
release-v44)
remote_refs "$base" 'release-v43'
;;
main-v4.4)
remote_refs "$base" 'main-v4.3'
;;
release-e43)
remote_refs "$base" 'release-v43'
;;
main-v4.3-enterprise)
remote_refs "$base" 'main-v4.3'
;;
release-e44)
remote_refs "$base" 'release-v44' 'release-e43' 'release-v43'
;;
main-v4.4-enterprise)
remote_refs "$base" 'main-v4.4' 'main-v4.3-enterprise' 'main-v4.3'
;;
esac
}
for remote_ref in $(upstream_branches "$BASE_BRANCH"); do
logmsg "Merging $remote_ref"
git merge $MERGE_OPTS "$remote_ref"
done

View File

@ -3,6 +3,9 @@
-mode(compile).
-define(RED, "\e[31m").
-define(RESET, "\e[39m").
usage() ->
"A script that fills in boilerplate for appup files.
@ -109,7 +112,7 @@ changes, supervisor changes, process restarts and so on. Also the load order of
the beam files might need updating.~n"),
halt(0);
warn_and_exit(false) ->
log("~nERROR: Incomplete appups found. Please inspect the output for more details.~n"),
logerr("Incomplete appups found. Please inspect the output for more details.~n", []),
halt(1).
prepare(Baseline, Options = #{make_command := MakeCommand, beams_dir := BeamDir}) ->
@ -474,8 +477,8 @@ check_appup(App, Upgrade, Downgrade, OldUpgrade, OldDowngrade) ->
ok;
{diffs, Diffs} ->
set_invalid(),
log("ERROR: Appup file for '~p' is not complete.~n"
"Missing:~100p~n", [App, Diffs]),
logerr("Appup file for '~p' is not complete.~n"
"Missing:~100p~n", [App, Diffs]),
notok
end.
@ -494,7 +497,7 @@ render_appup(App, File, Up, Down) ->
do_render_appup(File, Up, Down);
{error, enoent} when IsCheck ->
%% failed to read old file, exit
log("ERROR: ~s is missing", [File]),
logerr("~s is missing", [File]),
set_invalid()
end.
@ -580,8 +583,8 @@ diff_app(UpOrDown, App,
case UpOrDown =:= up of
true ->
%% only log for the upgrade case because it would be the same result
log("ERROR: Application '~p' contains changes, but its version is not updated. ~s",
[App, format_changes(Changes)]);
logerr("Application '~p' contains changes, but its version is not updated. ~s",
[App, format_changes(Changes)]);
false ->
ok
end;
@ -723,5 +726,8 @@ log(Msg) ->
log(Msg, Args) ->
io:format(standard_error, Msg, Args).
logerr(Msg, Args) ->
io:format(standard_error, ?RED ++ "ERROR: "++ Msg ++ ?RESET, Args).
otp_standard_apps() ->
[ssl, mnesia, kernel, asn1, stdlib].

View File

@ -2,7 +2,8 @@
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{"4.4.9",
[{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
[{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_relup,brutal_purge,soft_purge,[]},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
@ -13,7 +14,8 @@
{load_module,emqx_router,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
{"4.4.8",
[{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
[{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
@ -253,7 +255,8 @@
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{"4.4.9",
[{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
[{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_relup,brutal_purge,soft_purge,[]},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
@ -264,7 +267,8 @@
{load_module,emqx_router,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
{"4.4.8",
[{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
[{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},

View File

@ -47,6 +47,8 @@
, index_of/2
, maybe_parse_ip/1
, ipv6_probe/1
, pmap/2
, pmap/3
]).
-export([ bin2hexstr_a_f_upper/1
@ -57,7 +59,13 @@
-export([ is_sane_id/1
]).
-export([
nolink_apply/1,
nolink_apply/2
]).
-define(VALID_STR_RE, "^[A-Za-z0-9]+[A-Za-z0-9-_]*$").
-define(DEFAULT_PMAP_TIMEOUT, 5000).
-spec is_sane_id(list() | binary()) -> ok | {error, Reason::binary()}.
is_sane_id(Str) ->
@ -330,6 +338,110 @@ hexchar2int(I) when I >= $0 andalso I =< $9 -> I - $0;
hexchar2int(I) when I >= $A andalso I =< $F -> I - $A + 10;
hexchar2int(I) when I >= $a andalso I =< $f -> I - $a + 10.
%% @doc Like lists:map/2, only the callback function is evaluated
%% concurrently.
-spec pmap(fun((A) -> B), list(A)) -> list(B).
pmap(Fun, List) when is_function(Fun, 1), is_list(List) ->
pmap(Fun, List, ?DEFAULT_PMAP_TIMEOUT).
-spec pmap(fun((A) -> B), list(A), timeout()) -> list(B).
pmap(Fun, List, Timeout) when
is_function(Fun, 1), is_list(List), is_integer(Timeout), Timeout >= 0
->
nolink_apply(fun() -> do_parallel_map(Fun, List) end, Timeout).
%% @doc Delegate a function to a worker process.
%% The function may spawn_link other processes but we do not
%% want the caller process to be linked.
%% This is done by isolating the possible link with a not-linked
%% middleman process.
nolink_apply(Fun) -> nolink_apply(Fun, infinity).
%% @doc Same as `nolink_apply/1', with a timeout.
-spec nolink_apply(function(), timer:timeout()) -> term().
nolink_apply(Fun, Timeout) when is_function(Fun, 0) ->
Caller = self(),
ResRef = make_ref(),
Middleman = erlang:spawn(make_middleman_fn(Caller, Fun, ResRef)),
receive
{ResRef, {normal, Result}} ->
Result;
{ResRef, {exception, {C, E, S}}} ->
erlang:raise(C, E, S);
{ResRef, {'EXIT', Reason}} ->
exit(Reason)
after Timeout ->
exit(Middleman, kill),
exit(timeout)
end.
-spec make_middleman_fn(pid(), fun(() -> any()), reference()) -> fun(() -> no_return()).
make_middleman_fn(Caller, Fun, ResRef) ->
fun() ->
process_flag(trap_exit, true),
CallerMRef = erlang:monitor(process, Caller),
Worker = erlang:spawn_link(make_worker_fn(Caller, Fun, ResRef)),
receive
{'DOWN', CallerMRef, process, _, _} ->
%% For whatever reason, if the caller is dead,
%% there is no reason to continue
exit(Worker, kill),
exit(normal);
{'EXIT', Worker, normal} ->
exit(normal);
{'EXIT', Worker, Reason} ->
%% worker exited with some reason other than 'normal'
_ = erlang:send(Caller, {ResRef, {'EXIT', Reason}}),
exit(normal)
end
end.
-spec make_worker_fn(pid(), fun(() -> any()), reference()) -> fun(() -> no_return()).
make_worker_fn(Caller, Fun, ResRef) ->
fun() ->
Res =
try
{normal, Fun()}
catch
C:E:S ->
{exception, {C, E, S}}
end,
_ = erlang:send(Caller, {ResRef, Res}),
exit(normal)
end.
do_parallel_map(Fun, List) ->
Parent = self(),
PidList = lists:map(
fun(Item) ->
erlang:spawn_link(
fun() ->
Res =
try
{normal, Fun(Item)}
catch
C:E:St ->
{exception, {C, E, St}}
end,
Parent ! {self(), Res}
end
)
end,
List
),
lists:foldr(
fun(Pid, Acc) ->
receive
{Pid, {normal, Result}} ->
[Result | Acc];
{Pid, {exception, {C, E, St}}} ->
erlang:raise(C, E, St)
end
end,
[],
PidList
).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").

View File

@ -146,3 +146,36 @@ t_now_to_secs(_) ->
t_now_to_ms(_) ->
?assert(is_integer(emqx_misc:now_to_ms(os:timestamp()))).
t_pmap_normal(_) ->
?assertEqual(
[5, 7, 9],
emqx_misc:pmap(
fun({A, B}) -> A + B end,
[{2, 3}, {3, 4}, {4, 5}]
)
).
t_pmap_timeout(_) ->
?assertExit(
timeout,
emqx_misc:pmap(
fun
(timeout) -> ct:sleep(1000);
({A, B}) -> A + B
end,
[{2, 3}, {3, 4}, timeout],
100
)
).
t_pmap_exception(_) ->
?assertError(
foobar,
emqx_misc:pmap(
fun
(error) -> error(foobar);
({A, B}) -> A + B
end,
[{2, 3}, {3, 4}, error]
)
).