Compare commits
215 Commits
dependabot
...
master
Author | SHA1 | Date |
---|---|---|
![]() |
bcd63344b8 | |
![]() |
cc3b26a3ac | |
![]() |
dd686c24a0 | |
![]() |
592c4e0045 | |
![]() |
073e3ea0a8 | |
![]() |
81978ceaeb | |
![]() |
6bfddd9952 | |
![]() |
cf608a73a5 | |
![]() |
a8200fb83d | |
![]() |
9ad65c6ac1 | |
![]() |
9ca3985bbd | |
![]() |
e17becb84d | |
![]() |
5dd8fefded | |
![]() |
7b85faf12a | |
![]() |
b0594271b2 | |
![]() |
d8aa39a310 | |
![]() |
fc0434afc8 | |
![]() |
5502af18b7 | |
![]() |
9f96e0957e | |
![]() |
109ffe7a70 | |
![]() |
1559aac486 | |
![]() |
68990f1538 | |
![]() |
5356d678cc | |
![]() |
11951f8f6c | |
![]() |
0aa4cdbaf3 | |
![]() |
281f8ddc83 | |
![]() |
b80513e941 | |
![]() |
822ed71282 | |
![]() |
b8fd5de2a5 | |
![]() |
3ee84d60ae | |
![]() |
3b52b658cd | |
![]() |
cba3dcbeda | |
![]() |
caf1897979 | |
![]() |
dbbd5e1458 | |
![]() |
0ab31df9d2 | |
![]() |
613fc644f5 | |
![]() |
b1a53568d6 | |
![]() |
d6651a1889 | |
![]() |
4cf7151139 | |
![]() |
4865999606 | |
![]() |
382feab7d1 | |
![]() |
6aad774075 | |
![]() |
649cbf1c79 | |
![]() |
4cde5e98a3 | |
![]() |
d631b5b296 | |
![]() |
26ec69d5f4 | |
![]() |
58b9ab0210 | |
![]() |
4644072fd8 | |
![]() |
bd87e3ce2b | |
![]() |
c9c4d1a196 | |
![]() |
11546b72f4 | |
![]() |
bcb70a9fb9 | |
![]() |
09ec31908b | |
![]() |
b94ec4014f | |
![]() |
74c346f9d1 | |
![]() |
8a33ef8576 | |
![]() |
6c2033ecbf | |
![]() |
51530588ef | |
![]() |
bba9d085d6 | |
![]() |
3162fe7a27 | |
![]() |
52b2d73b28 | |
![]() |
44e7f2e9b2 | |
![]() |
baf2b96cbc | |
![]() |
ba2d4f3df3 | |
![]() |
11aaa7b07d | |
![]() |
4250d01363 | |
![]() |
86853ac6ef | |
![]() |
810a4d3cf9 | |
![]() |
7b243ef7ad | |
![]() |
fcf76d28ba | |
![]() |
3b5d98c1d9 | |
![]() |
451b03ff99 | |
![]() |
f792418a68 | |
![]() |
4915cc0da6 | |
![]() |
15b3f4deb0 | |
![]() |
7a251c9ead | |
![]() |
37a89d0094 | |
![]() |
c313aa89f0 | |
![]() |
6db1c0a446 | |
![]() |
d4508a4f1d | |
![]() |
a6a9538e73 | |
![]() |
9f97bff7d0 | |
![]() |
577f1a7d8a | |
![]() |
e42021d314 | |
![]() |
08c58cc319 | |
![]() |
150fee87f1 | |
![]() |
6058b50c91 | |
![]() |
85cff5e7eb | |
![]() |
569f48f5a1 | |
![]() |
2cf86e76ee | |
![]() |
74cef7937d | |
![]() |
c658cfe269 | |
![]() |
a246551914 | |
![]() |
b1c8bc2421 | |
![]() |
200b5ab294 | |
![]() |
8d8ff6cf5d | |
![]() |
a23b8266b1 | |
![]() |
d69342a2fc | |
![]() |
e6bfc14cc9 | |
![]() |
3d1f0c756c | |
![]() |
83041a8b83 | |
![]() |
1c4402b12c | |
![]() |
ebb69f4ebf | |
![]() |
fd961f9da7 | |
![]() |
359bc38aa4 | |
![]() |
08f70e4a25 | |
![]() |
e408804efb | |
![]() |
e294d35703 | |
![]() |
303ff95e10 | |
![]() |
23f0e88b45 | |
![]() |
f0dd1bc4f4 | |
![]() |
9b30320ddb | |
![]() |
cae27293a5 | |
![]() |
81f4103d60 | |
![]() |
bab526be24 | |
![]() |
9307a82004 | |
![]() |
b8e8f7c8e0 | |
![]() |
a97a0d6400 | |
![]() |
8705956cdc | |
![]() |
f213569460 | |
![]() |
7e23f8d19f | |
![]() |
a676ede6b8 | |
![]() |
9e5e7a23c5 | |
![]() |
143086b0ef | |
![]() |
c569625dd1 | |
![]() |
7daab1ab23 | |
![]() |
077ee38530 | |
![]() |
b74189570d | |
![]() |
649cf88042 | |
![]() |
1496f7f778 | |
![]() |
91dd1183ad | |
![]() |
65ab81ff74 | |
![]() |
53d4cd3174 | |
![]() |
7d004b37da | |
![]() |
e5547005eb | |
![]() |
fada2a3fea | |
![]() |
b4a010d63b | |
![]() |
9bde981c44 | |
![]() |
7658e081c5 | |
![]() |
8dce530d15 | |
![]() |
a20d262327 | |
![]() |
d32f282feb | |
![]() |
1d728a05b2 | |
![]() |
49bff5c08a | |
![]() |
61eda0ff31 | |
![]() |
8f0d807c00 | |
![]() |
bceb5d43ed | |
![]() |
03fea34962 | |
![]() |
082514f557 | |
![]() |
c831f0772f | |
![]() |
ca455ad992 | |
![]() |
c347c2c285 | |
![]() |
a49cd78aae | |
![]() |
4065158be7 | |
![]() |
18721d05bc | |
![]() |
7f7d0741d2 | |
![]() |
2e39c4ad5e | |
![]() |
5b50d5433a | |
![]() |
eab440e0c1 | |
![]() |
e08425e67d | |
![]() |
f6f1d32da0 | |
![]() |
2924ec582a | |
![]() |
8dc1d1424a | |
![]() |
693d5dd394 | |
![]() |
f85db0a0e9 | |
![]() |
60aefd1065 | |
![]() |
c637422302 | |
![]() |
e80d43d14d | |
![]() |
b3074144cc | |
![]() |
6786c9b517 | |
![]() |
8913de10c0 | |
![]() |
5ddd7d7a6a | |
![]() |
d7cac74bed | |
![]() |
0b0a28ae44 | |
![]() |
c1e2801f41 | |
![]() |
8036baf22c | |
![]() |
268f887700 | |
![]() |
1d56ac6e5e | |
![]() |
4e0742c66f | |
![]() |
8c1302f455 | |
![]() |
b8a2a8ea18 | |
![]() |
b7c424a13d | |
![]() |
1b6494ab9a | |
![]() |
41bf5cd6ca | |
![]() |
548bcceab7 | |
![]() |
1beda1cd11 | |
![]() |
9da744c423 | |
![]() |
b2f2af6871 | |
![]() |
3fae704903 | |
![]() |
2d6b2bff8e | |
![]() |
dc342a35ac | |
![]() |
397c104a85 | |
![]() |
49b24a3049 | |
![]() |
7bf70aaab6 | |
![]() |
9a5d50f26a | |
![]() |
df1f4fad70 | |
![]() |
33eccb35da | |
![]() |
f6a0f56771 | |
![]() |
7631420eef | |
![]() |
8f94e9684c | |
![]() |
43f799508a | |
![]() |
1925ed2f55 | |
![]() |
a45f817f0e | |
![]() |
57959ac7d4 | |
![]() |
79020b2436 | |
![]() |
141d8144e4 | |
![]() |
4f21594707 | |
![]() |
117c8197d7 | |
![]() |
c728b98e79 | |
![]() |
9e65e0d048 | |
![]() |
7374123c5c | |
![]() |
d7112921a6 | |
![]() |
69f5b6fa6c | |
![]() |
8ae54ac325 | |
![]() |
65544f34ec |
|
@ -10,7 +10,7 @@ services:
|
|||
nofile: 1024
|
||||
image: openldap
|
||||
#ports:
|
||||
# - 389:389
|
||||
# - "389:389"
|
||||
volumes:
|
||||
- ./certs/ca.crt:/etc/certs/ca.crt
|
||||
restart: always
|
||||
|
|
|
@ -0,0 +1,61 @@
|
|||
# LDAP authentication
|
||||
|
||||
To run manual tests with the default docker-compose files.
|
||||
|
||||
Expose openldap container port by uncommenting the `ports` config in `docker-compose-ldap.yaml `
|
||||
|
||||
To start openldap:
|
||||
|
||||
```
|
||||
docker-compose -f ./.ci/docker-compose-file/docker-compose.yaml -f ./.ci/docker-compose-file/docker-compose-ldap.yaml up -docker
|
||||
```
|
||||
|
||||
## LDAP database
|
||||
|
||||
LDAP database is populated from below files:
|
||||
```
|
||||
apps/emqx_ldap/test/data/emqx.io.ldif /usr/local/etc/openldap/schema/emqx.io.ldif
|
||||
apps/emqx_ldap/test/data/emqx.schema /usr/local/etc/openldap/schema/emqx.schema
|
||||
```
|
||||
|
||||
## Minimal EMQX config
|
||||
|
||||
```
|
||||
authentication = [
|
||||
{
|
||||
backend = ldap
|
||||
base_dn = "uid=${username},ou=testdevice,dc=emqx,dc=io"
|
||||
filter = "(& (objectClass=mqttUser) (uid=${username}))"
|
||||
mechanism = password_based
|
||||
method {
|
||||
is_superuser_attribute = isSuperuser
|
||||
password_attribute = userPassword
|
||||
type = hash
|
||||
}
|
||||
password = public
|
||||
pool_size = 8
|
||||
query_timeout = "5s"
|
||||
request_timeout = "10s"
|
||||
server = "localhost:1389"
|
||||
username = "cn=root,dc=emqx,dc=io"
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
## Example ldapsearch command
|
||||
|
||||
```
|
||||
ldapsearch -x -H ldap://localhost:389 -D "cn=root,dc=emqx,dc=io" -W -b "uid=mqttuser0007,ou=testdevice,dc=emqx,dc=io" "(&(objectClass=mqttUser)(uid=mqttuser0007))"
|
||||
```
|
||||
|
||||
## Example mqttx command
|
||||
|
||||
The client password hashes are generated from their username.
|
||||
|
||||
```
|
||||
# disabled user
|
||||
mqttx pub -t 't/1' -h localhost -p 1883 -m x -u mqttuser0006 -P mqttuser0006
|
||||
|
||||
# enabled super-user
|
||||
mqttx pub -t 't/1' -h localhost -p 1883 -m x -u mqttuser0007 -P mqttuser0007
|
||||
```
|
|
@ -51,7 +51,7 @@ runs:
|
|||
echo "SELF_HOSTED=false" >> $GITHUB_OUTPUT
|
||||
;;
|
||||
esac
|
||||
- uses: actions/cache@ab5e6d0c87105b4c9c2047343972218f562e4319 # v4.0.1
|
||||
- uses: actions/cache@0c45773b623bea8c8e75f6c82b208c3cf94ea4f9 # v4.0.2
|
||||
id: cache
|
||||
if: steps.prepare.outputs.SELF_HOSTED != 'true'
|
||||
with:
|
||||
|
|
|
@ -152,7 +152,7 @@ jobs:
|
|||
echo "PROFILE=${PROFILE}" | tee -a .env
|
||||
echo "PKG_VSN=$(./pkg-vsn.sh ${PROFILE})" | tee -a .env
|
||||
zip -ryq -x@.github/workflows/.zipignore $PROFILE.zip .
|
||||
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
|
||||
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
|
||||
with:
|
||||
name: ${{ matrix.profile }}
|
||||
path: ${{ matrix.profile }}.zip
|
||||
|
|
|
@ -163,7 +163,7 @@ jobs:
|
|||
echo "PROFILE=${PROFILE}" | tee -a .env
|
||||
echo "PKG_VSN=$(./pkg-vsn.sh ${PROFILE})" | tee -a .env
|
||||
zip -ryq -x@.github/workflows/.zipignore $PROFILE.zip .
|
||||
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
|
||||
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
|
||||
with:
|
||||
name: ${{ matrix.profile }}
|
||||
path: ${{ matrix.profile }}.zip
|
||||
|
|
|
@ -83,7 +83,7 @@ jobs:
|
|||
id: build
|
||||
run: |
|
||||
make ${{ matrix.profile }}-tgz
|
||||
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
|
||||
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
|
||||
with:
|
||||
name: "${{ matrix.profile }}-${{ matrix.arch }}.tar.gz"
|
||||
path: "_packages/emqx*/emqx-*.tar.gz"
|
||||
|
@ -110,7 +110,7 @@ jobs:
|
|||
- uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
|
||||
with:
|
||||
ref: ${{ github.event.inputs.ref }}
|
||||
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
|
||||
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
|
||||
with:
|
||||
pattern: "${{ matrix.profile[0] }}-*.tar.gz"
|
||||
path: _packages
|
||||
|
@ -122,24 +122,25 @@ jobs:
|
|||
run: |
|
||||
ls -lR _packages/$PROFILE
|
||||
mv _packages/$PROFILE/*.tar.gz ./
|
||||
|
||||
- name: Enable containerd image store on Docker Engine
|
||||
run: |
|
||||
echo "$(jq '. += {"features": {"containerd-snapshotter": true}}' /etc/docker/daemon.json)" > daemon.json
|
||||
echo "$(sudo cat /etc/docker/daemon.json | jq '. += {"features": {"containerd-snapshotter": true}}')" > daemon.json
|
||||
sudo mv daemon.json /etc/docker/daemon.json
|
||||
sudo systemctl restart docker
|
||||
|
||||
- uses: docker/setup-qemu-action@68827325e0b33c7199eb31dd4e31fbe9023e06e3 # v3.0.0
|
||||
- uses: docker/setup-buildx-action@d70bba72b1f3fd22344832f00baa16ece964efeb # v3.3.0
|
||||
- uses: docker/setup-qemu-action@49b3bc8e6bdd4a60e6116a5414239cba5943d3cf # v3.2.0
|
||||
- uses: docker/setup-buildx-action@988b5a0280414f521da01fcc63a27aeeb4b104db # v3.6.1
|
||||
|
||||
- name: Login to hub.docker.com
|
||||
uses: docker/login-action@0d4c9c5ea7693da7b068278f7b52bda2a190a446 # v3.2.0
|
||||
uses: docker/login-action@9780b0c442fbb1117ed29e0efdff1e18412f7567 # v3.3.0
|
||||
if: inputs.publish && contains(matrix.profile[1], 'docker.io')
|
||||
with:
|
||||
username: ${{ secrets.DOCKER_HUB_USER }}
|
||||
password: ${{ secrets.DOCKER_HUB_TOKEN }}
|
||||
|
||||
- name: Login to AWS ECR
|
||||
uses: docker/login-action@0d4c9c5ea7693da7b068278f7b52bda2a190a446 # v3.2.0
|
||||
uses: docker/login-action@9780b0c442fbb1117ed29e0efdff1e18412f7567 # v3.3.0
|
||||
if: inputs.publish && contains(matrix.profile[1], 'public.ecr.aws')
|
||||
with:
|
||||
registry: public.ecr.aws
|
||||
|
|
|
@ -51,7 +51,7 @@ jobs:
|
|||
if: always()
|
||||
run: |
|
||||
docker save $_EMQX_DOCKER_IMAGE_TAG | gzip > $EMQX_NAME-docker-$PKG_VSN.tar.gz
|
||||
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
|
||||
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
|
||||
with:
|
||||
name: "${{ env.EMQX_NAME }}-docker"
|
||||
path: "${{ env.EMQX_NAME }}-docker-${{ env.PKG_VSN }}.tar.gz"
|
||||
|
|
|
@ -95,7 +95,7 @@ jobs:
|
|||
apple_developer_identity: ${{ secrets.APPLE_DEVELOPER_IDENTITY }}
|
||||
apple_developer_id_bundle: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE }}
|
||||
apple_developer_id_bundle_password: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE_PASSWORD }}
|
||||
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
|
||||
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
|
||||
if: success()
|
||||
with:
|
||||
name: ${{ matrix.profile }}-${{ matrix.os }}-${{ matrix.otp }}
|
||||
|
@ -180,7 +180,7 @@ jobs:
|
|||
--builder $BUILDER \
|
||||
--elixir $IS_ELIXIR \
|
||||
--pkgtype pkg
|
||||
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
|
||||
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
|
||||
with:
|
||||
name: ${{ matrix.profile }}-${{ matrix.os }}-${{ matrix.arch }}${{ matrix.with_elixir == 'yes' && '-elixir' || '' }}-${{ matrix.builder }}-${{ matrix.otp }}-${{ matrix.elixir }}
|
||||
path: _packages/${{ matrix.profile }}/
|
||||
|
@ -198,7 +198,7 @@ jobs:
|
|||
profile:
|
||||
- ${{ inputs.profile }}
|
||||
steps:
|
||||
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
|
||||
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
|
||||
with:
|
||||
pattern: "${{ matrix.profile }}-*"
|
||||
path: packages/${{ matrix.profile }}
|
||||
|
|
|
@ -23,6 +23,7 @@ jobs:
|
|||
profile:
|
||||
- ['emqx', 'master']
|
||||
- ['emqx', 'release-57']
|
||||
- ['emqx', 'release-58']
|
||||
os:
|
||||
- ubuntu22.04
|
||||
- amzn2023
|
||||
|
@ -53,7 +54,7 @@ jobs:
|
|||
- name: build pkg
|
||||
run: |
|
||||
./scripts/buildx.sh --profile "$PROFILE" --pkgtype pkg --builder "$BUILDER"
|
||||
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
|
||||
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
|
||||
if: success()
|
||||
with:
|
||||
name: ${{ matrix.profile[0] }}-${{ matrix.profile[1] }}-${{ matrix.os }}
|
||||
|
@ -101,7 +102,7 @@ jobs:
|
|||
apple_developer_identity: ${{ secrets.APPLE_DEVELOPER_IDENTITY }}
|
||||
apple_developer_id_bundle: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE }}
|
||||
apple_developer_id_bundle_password: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE_PASSWORD }}
|
||||
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
|
||||
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
|
||||
if: success()
|
||||
with:
|
||||
name: ${{ matrix.profile }}-${{ matrix.os }}
|
||||
|
|
|
@ -41,13 +41,13 @@ jobs:
|
|||
- name: build pkg
|
||||
run: |
|
||||
./scripts/buildx.sh --profile $PROFILE --pkgtype pkg --elixir $ELIXIR --arch $ARCH
|
||||
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
|
||||
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
|
||||
with:
|
||||
name: "${{ matrix.profile[0] }}-${{ matrix.profile[1] }}-${{ matrix.profile[2] }}"
|
||||
path: _packages/${{ matrix.profile[0] }}/*
|
||||
retention-days: 7
|
||||
compression-level: 0
|
||||
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
|
||||
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
|
||||
with:
|
||||
name: "${{ matrix.profile[0] }}-schema-dump-${{ matrix.profile[1] }}-${{ matrix.profile[2] }}"
|
||||
path: |
|
||||
|
@ -84,7 +84,7 @@ jobs:
|
|||
apple_developer_identity: ${{ secrets.APPLE_DEVELOPER_IDENTITY }}
|
||||
apple_developer_id_bundle: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE }}
|
||||
apple_developer_id_bundle_password: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE_PASSWORD }}
|
||||
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
|
||||
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
|
||||
with:
|
||||
name: ${{ matrix.os }}
|
||||
path: _packages/**/*
|
||||
|
|
|
@ -37,7 +37,7 @@ jobs:
|
|||
- run: ./scripts/check-elixir-deps-discrepancies.exs
|
||||
- run: ./scripts/check-elixir-applications.exs
|
||||
- name: Upload produced lock files
|
||||
uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
|
||||
uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
|
||||
if: failure()
|
||||
with:
|
||||
name: ${{ matrix.profile }}_produced_lock_files
|
||||
|
|
|
@ -24,6 +24,7 @@ jobs:
|
|||
branch:
|
||||
- master
|
||||
- release-57
|
||||
- release-58
|
||||
language:
|
||||
- cpp
|
||||
- python
|
||||
|
|
|
@ -24,6 +24,7 @@ jobs:
|
|||
ref:
|
||||
- master
|
||||
- release-57
|
||||
- release-58
|
||||
steps:
|
||||
- uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
|
||||
with:
|
||||
|
|
|
@ -52,7 +52,7 @@ jobs:
|
|||
id: package_file
|
||||
run: |
|
||||
echo "PACKAGE_FILE=$(find _packages/emqx -name 'emqx-*.deb' | head -n 1 | xargs basename)" >> $GITHUB_OUTPUT
|
||||
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
|
||||
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
|
||||
with:
|
||||
name: emqx-ubuntu20.04
|
||||
path: _packages/emqx/${{ steps.package_file.outputs.PACKAGE_FILE }}
|
||||
|
@ -77,7 +77,7 @@ jobs:
|
|||
repository: emqx/tf-emqx-performance-test
|
||||
path: tf-emqx-performance-test
|
||||
ref: v0.2.3
|
||||
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
|
||||
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
|
||||
with:
|
||||
name: emqx-ubuntu20.04
|
||||
path: tf-emqx-performance-test/
|
||||
|
@ -113,13 +113,13 @@ jobs:
|
|||
working-directory: ./tf-emqx-performance-test
|
||||
run: |
|
||||
terraform destroy -auto-approve
|
||||
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
|
||||
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
|
||||
if: success()
|
||||
with:
|
||||
name: metrics
|
||||
path: |
|
||||
"./tf-emqx-performance-test/*.tar.gz"
|
||||
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
|
||||
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
|
||||
if: failure()
|
||||
with:
|
||||
name: terraform
|
||||
|
@ -148,7 +148,7 @@ jobs:
|
|||
repository: emqx/tf-emqx-performance-test
|
||||
path: tf-emqx-performance-test
|
||||
ref: v0.2.3
|
||||
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
|
||||
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
|
||||
with:
|
||||
name: emqx-ubuntu20.04
|
||||
path: tf-emqx-performance-test/
|
||||
|
@ -184,13 +184,13 @@ jobs:
|
|||
working-directory: ./tf-emqx-performance-test
|
||||
run: |
|
||||
terraform destroy -auto-approve
|
||||
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
|
||||
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
|
||||
if: success()
|
||||
with:
|
||||
name: metrics
|
||||
path: |
|
||||
"./tf-emqx-performance-test/*.tar.gz"
|
||||
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
|
||||
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
|
||||
if: failure()
|
||||
with:
|
||||
name: terraform
|
||||
|
@ -220,7 +220,7 @@ jobs:
|
|||
repository: emqx/tf-emqx-performance-test
|
||||
path: tf-emqx-performance-test
|
||||
ref: v0.2.3
|
||||
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
|
||||
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
|
||||
with:
|
||||
name: emqx-ubuntu20.04
|
||||
path: tf-emqx-performance-test/
|
||||
|
@ -257,13 +257,13 @@ jobs:
|
|||
working-directory: ./tf-emqx-performance-test
|
||||
run: |
|
||||
terraform destroy -auto-approve
|
||||
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
|
||||
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
|
||||
if: success()
|
||||
with:
|
||||
name: metrics
|
||||
path: |
|
||||
"./tf-emqx-performance-test/*.tar.gz"
|
||||
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
|
||||
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
|
||||
if: failure()
|
||||
with:
|
||||
name: terraform
|
||||
|
@ -294,7 +294,7 @@ jobs:
|
|||
repository: emqx/tf-emqx-performance-test
|
||||
path: tf-emqx-performance-test
|
||||
ref: v0.2.3
|
||||
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
|
||||
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
|
||||
with:
|
||||
name: emqx-ubuntu20.04
|
||||
path: tf-emqx-performance-test/
|
||||
|
@ -330,13 +330,13 @@ jobs:
|
|||
working-directory: ./tf-emqx-performance-test
|
||||
run: |
|
||||
terraform destroy -auto-approve
|
||||
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
|
||||
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
|
||||
if: success()
|
||||
with:
|
||||
name: metrics
|
||||
path: |
|
||||
"./tf-emqx-performance-test/*.tar.gz"
|
||||
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
|
||||
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
|
||||
if: failure()
|
||||
with:
|
||||
name: terraform
|
||||
|
|
|
@ -25,7 +25,7 @@ jobs:
|
|||
- emqx
|
||||
- emqx-enterprise
|
||||
steps:
|
||||
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
|
||||
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
|
||||
with:
|
||||
name: ${{ matrix.profile }}
|
||||
- name: extract artifact
|
||||
|
@ -40,7 +40,7 @@ jobs:
|
|||
if: failure()
|
||||
run: |
|
||||
cat _build/${{ matrix.profile }}/rel/emqx/log/erlang.log.*
|
||||
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
|
||||
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
|
||||
if: failure()
|
||||
with:
|
||||
name: conftest-logs-${{ matrix.profile }}
|
||||
|
|
|
@ -35,7 +35,7 @@ jobs:
|
|||
source env.sh
|
||||
PKG_VSN=$(docker run --rm -v $(pwd):$(pwd) -w $(pwd) -u $(id -u) "$EMQX_BUILDER" ./pkg-vsn.sh "$EMQX_NAME")
|
||||
echo "PKG_VSN=$PKG_VSN" >> "$GITHUB_ENV"
|
||||
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
|
||||
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
|
||||
with:
|
||||
name: ${{ env.EMQX_NAME }}-docker
|
||||
path: /tmp
|
||||
|
@ -69,7 +69,6 @@ jobs:
|
|||
shell: bash
|
||||
env:
|
||||
EMQX_NAME: ${{ matrix.profile }}
|
||||
_EMQX_TEST_DB_BACKEND: ${{ matrix.cluster_db_backend }}
|
||||
|
||||
strategy:
|
||||
fail-fast: false
|
||||
|
@ -78,18 +77,20 @@ jobs:
|
|||
- emqx
|
||||
- emqx-enterprise
|
||||
- emqx-elixir
|
||||
cluster_db_backend:
|
||||
- mnesia
|
||||
- rlog
|
||||
steps:
|
||||
- uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
|
||||
- name: Set up environment
|
||||
id: env
|
||||
run: |
|
||||
source env.sh
|
||||
if [ "$EMQX_NAME" = "emqx-enterprise" ]; then
|
||||
_EMQX_TEST_DB_BACKEND='rlog'
|
||||
else
|
||||
_EMQX_TEST_DB_BACKEND='mnesia'
|
||||
fi
|
||||
PKG_VSN=$(docker run --rm -v $(pwd):$(pwd) -w $(pwd) -u $(id -u) "$EMQX_BUILDER" ./pkg-vsn.sh "$EMQX_NAME")
|
||||
echo "PKG_VSN=$PKG_VSN" >> "$GITHUB_ENV"
|
||||
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
|
||||
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
|
||||
with:
|
||||
name: ${{ env.EMQX_NAME }}-docker
|
||||
path: /tmp
|
||||
|
|
|
@ -95,7 +95,7 @@ jobs:
|
|||
echo "Suites: $SUITES"
|
||||
./rebar3 as standalone_test ct --name 'test@127.0.0.1' -v --readable=true --suite="$SUITES"
|
||||
fi
|
||||
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
|
||||
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
|
||||
if: failure()
|
||||
with:
|
||||
name: logs-emqx-app-tests-${{ matrix.type }}
|
||||
|
|
|
@ -44,7 +44,7 @@ jobs:
|
|||
source env.sh
|
||||
PKG_VSN=$(docker run --rm -v $(pwd):$(pwd) -w $(pwd) -u $(id -u) "$EMQX_BUILDER" ./pkg-vsn.sh "$EMQX_NAME")
|
||||
echo "EMQX_TAG=$PKG_VSN" >> "$GITHUB_ENV"
|
||||
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
|
||||
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
|
||||
with:
|
||||
name: "${{ env.EMQX_NAME }}-docker"
|
||||
path: /tmp
|
||||
|
|
|
@ -31,7 +31,7 @@ jobs:
|
|||
else
|
||||
wget --no-verbose --no-check-certificate -O /tmp/apache-jmeter.tgz $ARCHIVE_URL
|
||||
fi
|
||||
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
|
||||
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
|
||||
with:
|
||||
name: apache-jmeter.tgz
|
||||
path: /tmp/apache-jmeter.tgz
|
||||
|
@ -58,7 +58,7 @@ jobs:
|
|||
source env.sh
|
||||
PKG_VSN=$(docker run --rm -v $(pwd):$(pwd) -w $(pwd) -u $(id -u) "$EMQX_BUILDER" ./pkg-vsn.sh emqx)
|
||||
echo "PKG_VSN=$PKG_VSN" >> "$GITHUB_ENV"
|
||||
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
|
||||
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
|
||||
with:
|
||||
name: emqx-docker
|
||||
path: /tmp
|
||||
|
@ -95,7 +95,7 @@ jobs:
|
|||
echo "check logs failed"
|
||||
exit 1
|
||||
fi
|
||||
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
|
||||
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
|
||||
if: always()
|
||||
with:
|
||||
name: jmeter_logs-advanced_feat-${{ matrix.scripts_type }}
|
||||
|
@ -127,7 +127,7 @@ jobs:
|
|||
source env.sh
|
||||
PKG_VSN=$(docker run --rm -v $(pwd):$(pwd) -w $(pwd) -u $(id -u) "$EMQX_BUILDER" ./pkg-vsn.sh emqx)
|
||||
echo "PKG_VSN=$PKG_VSN" >> "$GITHUB_ENV"
|
||||
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
|
||||
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
|
||||
with:
|
||||
name: emqx-docker
|
||||
path: /tmp
|
||||
|
@ -175,7 +175,7 @@ jobs:
|
|||
if: failure()
|
||||
run: |
|
||||
docker compose -f .ci/docker-compose-file/docker-compose-emqx-cluster.yaml logs --no-color > ./jmeter_logs/emqx.log
|
||||
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
|
||||
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
|
||||
if: always()
|
||||
with:
|
||||
name: jmeter_logs-pgsql_authn_authz-${{ matrix.scripts_type }}_${{ matrix.pgsql_tag }}
|
||||
|
@ -204,7 +204,7 @@ jobs:
|
|||
source env.sh
|
||||
PKG_VSN=$(docker run --rm -v $(pwd):$(pwd) -w $(pwd) -u $(id -u) "$EMQX_BUILDER" ./pkg-vsn.sh emqx)
|
||||
echo "PKG_VSN=$PKG_VSN" >> "$GITHUB_ENV"
|
||||
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
|
||||
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
|
||||
with:
|
||||
name: emqx-docker
|
||||
path: /tmp
|
||||
|
@ -248,7 +248,7 @@ jobs:
|
|||
echo "check logs failed"
|
||||
exit 1
|
||||
fi
|
||||
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
|
||||
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
|
||||
if: always()
|
||||
with:
|
||||
name: jmeter_logs-mysql_authn_authz-${{ matrix.scripts_type }}_${{ matrix.mysql_tag }}
|
||||
|
@ -273,7 +273,7 @@ jobs:
|
|||
source env.sh
|
||||
PKG_VSN=$(docker run --rm -v $(pwd):$(pwd) -w $(pwd) -u $(id -u) "$EMQX_BUILDER" ./pkg-vsn.sh emqx)
|
||||
echo "PKG_VSN=$PKG_VSN" >> "$GITHUB_ENV"
|
||||
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
|
||||
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
|
||||
with:
|
||||
name: emqx-docker
|
||||
path: /tmp
|
||||
|
@ -313,7 +313,7 @@ jobs:
|
|||
echo "check logs failed"
|
||||
exit 1
|
||||
fi
|
||||
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
|
||||
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
|
||||
if: always()
|
||||
with:
|
||||
name: jmeter_logs-JWT_authn-${{ matrix.scripts_type }}
|
||||
|
@ -339,7 +339,7 @@ jobs:
|
|||
source env.sh
|
||||
PKG_VSN=$(docker run --rm -v $(pwd):$(pwd) -w $(pwd) -u $(id -u) "$EMQX_BUILDER" ./pkg-vsn.sh emqx)
|
||||
echo "PKG_VSN=$PKG_VSN" >> "$GITHUB_ENV"
|
||||
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
|
||||
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
|
||||
with:
|
||||
name: emqx-docker
|
||||
path: /tmp
|
||||
|
@ -370,7 +370,7 @@ jobs:
|
|||
echo "check logs failed"
|
||||
exit 1
|
||||
fi
|
||||
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
|
||||
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
|
||||
if: always()
|
||||
with:
|
||||
name: jmeter_logs-built_in_database_authn_authz-${{ matrix.scripts_type }}
|
||||
|
|
|
@ -25,7 +25,7 @@ jobs:
|
|||
run:
|
||||
shell: bash
|
||||
steps:
|
||||
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
|
||||
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
|
||||
with:
|
||||
name: emqx-enterprise
|
||||
- name: extract artifact
|
||||
|
@ -45,7 +45,7 @@ jobs:
|
|||
run: |
|
||||
export PROFILE='emqx-enterprise'
|
||||
make emqx-enterprise-tgz
|
||||
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
|
||||
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
|
||||
name: Upload built emqx and test scenario
|
||||
with:
|
||||
name: relup_tests_emqx_built
|
||||
|
@ -72,7 +72,7 @@ jobs:
|
|||
run:
|
||||
shell: bash
|
||||
steps:
|
||||
- uses: erlef/setup-beam@a6e26b22319003294c58386b6f25edbc7336819a # v1.18.0
|
||||
- uses: erlef/setup-beam@b9c58b0450cd832ccdb3c17cc156a47065d2114f # v1.18.1
|
||||
with:
|
||||
otp-version: 26.2.5
|
||||
- uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
|
||||
|
@ -88,7 +88,7 @@ jobs:
|
|||
./configure
|
||||
make
|
||||
echo "$(pwd)/bin" >> $GITHUB_PATH
|
||||
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
|
||||
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
|
||||
name: Download built emqx and test scenario
|
||||
with:
|
||||
name: relup_tests_emqx_built
|
||||
|
@ -111,7 +111,7 @@ jobs:
|
|||
docker logs node2.emqx.io | tee lux_logs/emqx2.log
|
||||
exit 1
|
||||
fi
|
||||
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
|
||||
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
|
||||
name: Save debug data
|
||||
if: failure()
|
||||
with:
|
||||
|
|
|
@ -46,7 +46,7 @@ jobs:
|
|||
contents: read
|
||||
|
||||
steps:
|
||||
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
|
||||
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
|
||||
with:
|
||||
name: ${{ matrix.profile }}
|
||||
|
||||
|
@ -90,7 +90,7 @@ jobs:
|
|||
contents: read
|
||||
|
||||
steps:
|
||||
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
|
||||
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
|
||||
with:
|
||||
name: ${{ matrix.profile }}
|
||||
- name: extract artifact
|
||||
|
@ -133,7 +133,7 @@ jobs:
|
|||
if: failure()
|
||||
run: tar -czf logs.tar.gz _build/test/logs
|
||||
|
||||
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
|
||||
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
|
||||
if: failure()
|
||||
with:
|
||||
name: logs-${{ matrix.profile }}-${{ matrix.prefix }}-sg${{ matrix.suitegroup }}
|
||||
|
@ -164,7 +164,7 @@ jobs:
|
|||
CT_COVER_EXPORT_PREFIX: ${{ matrix.profile }}-sg${{ matrix.suitegroup }}
|
||||
|
||||
steps:
|
||||
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
|
||||
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
|
||||
with:
|
||||
name: ${{ matrix.profile }}
|
||||
- name: extract artifact
|
||||
|
@ -193,7 +193,7 @@ jobs:
|
|||
if: failure()
|
||||
run: tar -czf logs.tar.gz _build/test/logs
|
||||
|
||||
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
|
||||
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
|
||||
if: failure()
|
||||
with:
|
||||
name: logs-${{ matrix.profile }}-${{ matrix.prefix }}-sg${{ matrix.suitegroup }}
|
||||
|
|
|
@ -30,7 +30,7 @@ jobs:
|
|||
persist-credentials: false
|
||||
|
||||
- name: "Run analysis"
|
||||
uses: ossf/scorecard-action@dc50aa9510b46c811795eb24b2f1ba02a914e534 # v2.3.3
|
||||
uses: ossf/scorecard-action@62b2cac7ed8198b15735ed49ab1e5cf35480ba46 # v2.4.0
|
||||
with:
|
||||
results_file: results.sarif
|
||||
results_format: sarif
|
||||
|
@ -40,7 +40,7 @@ jobs:
|
|||
publish_results: true
|
||||
|
||||
- name: "Upload artifact"
|
||||
uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
|
||||
uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
|
||||
with:
|
||||
name: SARIF file
|
||||
path: results.sarif
|
||||
|
|
|
@ -19,7 +19,7 @@ jobs:
|
|||
- emqx-enterprise
|
||||
runs-on: ${{ endsWith(github.repository, '/emqx') && 'ubuntu-22.04' || fromJSON('["self-hosted","ephemeral","linux","x64"]') }}
|
||||
steps:
|
||||
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
|
||||
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
|
||||
with:
|
||||
pattern: "${{ matrix.profile }}-schema-dump-*-x64"
|
||||
merge-multiple: true
|
||||
|
|
|
@ -30,7 +30,7 @@ jobs:
|
|||
include: ${{ fromJson(inputs.ct-matrix) }}
|
||||
container: "${{ inputs.builder }}"
|
||||
steps:
|
||||
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
|
||||
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
|
||||
with:
|
||||
name: ${{ matrix.profile }}
|
||||
- name: extract artifact
|
||||
|
|
|
@ -34,7 +34,7 @@ jobs:
|
|||
pull-requests: write
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633 # v4.1.2
|
||||
- uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
|
|
4
Makefile
4
Makefile
|
@ -10,8 +10,8 @@ include env.sh
|
|||
|
||||
# Dashboard version
|
||||
# from https://github.com/emqx/emqx-dashboard5
|
||||
export EMQX_DASHBOARD_VERSION ?= v1.9.1
|
||||
export EMQX_EE_DASHBOARD_VERSION ?= e1.7.1
|
||||
export EMQX_DASHBOARD_VERSION ?= v1.10.0-beta.1
|
||||
export EMQX_EE_DASHBOARD_VERSION ?= e1.8.0-beta.1
|
||||
|
||||
export EMQX_RELUP ?= true
|
||||
export EMQX_REL_FORM ?= tgz
|
||||
|
|
|
@ -65,9 +65,20 @@
|
|||
%% Route
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-record(share_dest, {
|
||||
session_id :: emqx_session:session_id(),
|
||||
group :: emqx_types:group()
|
||||
}).
|
||||
|
||||
-record(route, {
|
||||
topic :: binary(),
|
||||
dest :: node() | {binary(), node()} | emqx_session:session_id() | emqx_external_broker:dest()
|
||||
dest ::
|
||||
node()
|
||||
| {binary(), node()}
|
||||
| emqx_session:session_id()
|
||||
%% One session can also have multiple subscriptions to the same topic through different groups
|
||||
| #share_dest{}
|
||||
| emqx_external_broker:dest()
|
||||
}).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -683,6 +683,7 @@ end).
|
|||
|
||||
-define(FRAME_PARSE_ERROR, frame_parse_error).
|
||||
-define(FRAME_SERIALIZE_ERROR, frame_serialize_error).
|
||||
|
||||
-define(THROW_FRAME_ERROR(Reason), erlang:throw({?FRAME_PARSE_ERROR, Reason})).
|
||||
-define(THROW_SERIALIZE_ERROR(Reason), erlang:throw({?FRAME_SERIALIZE_ERROR, Reason})).
|
||||
|
||||
|
|
|
@ -32,7 +32,7 @@
|
|||
%% `apps/emqx/src/bpapi/README.md'
|
||||
|
||||
%% Opensource edition
|
||||
-define(EMQX_RELEASE_CE, "5.7.1").
|
||||
-define(EMQX_RELEASE_CE, "5.8.0-alpha.1").
|
||||
|
||||
%% Enterprise edition
|
||||
-define(EMQX_RELEASE_EE, "5.7.1").
|
||||
-define(EMQX_RELEASE_EE, "5.8.0-alpha.1").
|
||||
|
|
|
@ -41,16 +41,20 @@
|
|||
).
|
||||
|
||||
%% NOTE: do not forget to use atom for msg and add every used msg to
|
||||
%% the default value of `log.thorttling.msgs` list.
|
||||
%% the default value of `log.throttling.msgs` list.
|
||||
-define(SLOG_THROTTLE(Level, Data),
|
||||
?SLOG_THROTTLE(Level, Data, #{})
|
||||
).
|
||||
|
||||
-define(SLOG_THROTTLE(Level, Data, Meta),
|
||||
?SLOG_THROTTLE(Level, undefined, Data, Meta)
|
||||
).
|
||||
|
||||
-define(SLOG_THROTTLE(Level, UniqueKey, Data, Meta),
|
||||
case logger:allow(Level, ?MODULE) of
|
||||
true ->
|
||||
(fun(#{msg := __Msg} = __Data) ->
|
||||
case emqx_log_throttler:allow(__Msg) of
|
||||
case emqx_log_throttler:allow(__Msg, UniqueKey) of
|
||||
true ->
|
||||
logger:log(Level, __Data, Meta);
|
||||
false ->
|
||||
|
@ -87,7 +91,7 @@
|
|||
?_DO_TRACE(Tag, Msg, Meta),
|
||||
?SLOG(
|
||||
Level,
|
||||
(emqx_trace_formatter:format_meta_map(Meta))#{msg => Msg, tag => Tag},
|
||||
(Meta)#{msg => Msg, tag => Tag},
|
||||
#{is_trace => false}
|
||||
)
|
||||
end).
|
||||
|
|
|
@ -27,6 +27,7 @@
|
|||
{emqx_ds,2}.
|
||||
{emqx_ds,3}.
|
||||
{emqx_ds,4}.
|
||||
{emqx_ds_shared_sub,1}.
|
||||
{emqx_eviction_agent,1}.
|
||||
{emqx_eviction_agent,2}.
|
||||
{emqx_eviction_agent,3}.
|
||||
|
|
|
@ -28,15 +28,14 @@
|
|||
{lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}},
|
||||
{gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
|
||||
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},
|
||||
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.3"}}},
|
||||
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.12.0"}}},
|
||||
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.5"}}},
|
||||
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}},
|
||||
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.43.1"}}},
|
||||
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.43.2"}}},
|
||||
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}},
|
||||
{pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
|
||||
{recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}},
|
||||
{snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.10"}}},
|
||||
{ra, "2.7.3"}
|
||||
{snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.10"}}}
|
||||
]}.
|
||||
|
||||
{plugins, [{rebar3_proper, "0.12.1"}, rebar3_path_deps]}.
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
{application, emqx, [
|
||||
{id, "emqx"},
|
||||
{description, "EMQX Core"},
|
||||
{vsn, "5.3.3"},
|
||||
{vsn, "5.3.4"},
|
||||
{modules, []},
|
||||
{registered, []},
|
||||
{applications, [
|
||||
|
|
|
@ -146,7 +146,9 @@
|
|||
-type replies() :: emqx_types:packet() | reply() | [reply()].
|
||||
|
||||
-define(IS_MQTT_V5, #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}).
|
||||
|
||||
-define(IS_CONNECTED_OR_REAUTHENTICATING(ConnState),
|
||||
((ConnState == connected) orelse (ConnState == reauthenticating))
|
||||
).
|
||||
-define(IS_COMMON_SESSION_TIMER(N),
|
||||
((N == retry_delivery) orelse (N == expire_awaiting_rel))
|
||||
).
|
||||
|
@ -337,7 +339,7 @@ take_conn_info_fields(Fields, ClientInfo, ConnInfo) ->
|
|||
| {shutdown, Reason :: term(), channel()}
|
||||
| {shutdown, Reason :: term(), replies(), channel()}.
|
||||
handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = ConnState}) when
|
||||
ConnState =:= connected orelse ConnState =:= reauthenticating
|
||||
?IS_CONNECTED_OR_REAUTHENTICATING(ConnState)
|
||||
->
|
||||
handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel);
|
||||
handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = connecting}) ->
|
||||
|
@ -567,29 +569,8 @@ handle_in(
|
|||
process_disconnect(ReasonCode, Properties, NChannel);
|
||||
handle_in(?AUTH_PACKET(), Channel) ->
|
||||
handle_out(disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, Channel);
|
||||
handle_in({frame_error, Reason}, Channel = #channel{conn_state = idle}) ->
|
||||
shutdown(shutdown_count(frame_error, Reason), Channel);
|
||||
handle_in(
|
||||
{frame_error, #{cause := frame_too_large} = R}, Channel = #channel{conn_state = connecting}
|
||||
) ->
|
||||
shutdown(
|
||||
shutdown_count(frame_error, R), ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE), Channel
|
||||
);
|
||||
handle_in({frame_error, Reason}, Channel = #channel{conn_state = connecting}) ->
|
||||
shutdown(shutdown_count(frame_error, Reason), ?CONNACK_PACKET(?RC_MALFORMED_PACKET), Channel);
|
||||
handle_in(
|
||||
{frame_error, #{cause := frame_too_large}}, Channel = #channel{conn_state = ConnState}
|
||||
) when
|
||||
ConnState =:= connected orelse ConnState =:= reauthenticating
|
||||
->
|
||||
handle_out(disconnect, {?RC_PACKET_TOO_LARGE, frame_too_large}, Channel);
|
||||
handle_in({frame_error, Reason}, Channel = #channel{conn_state = ConnState}) when
|
||||
ConnState =:= connected orelse ConnState =:= reauthenticating
|
||||
->
|
||||
handle_out(disconnect, {?RC_MALFORMED_PACKET, Reason}, Channel);
|
||||
handle_in({frame_error, Reason}, Channel = #channel{conn_state = disconnected}) ->
|
||||
?SLOG(error, #{msg => "malformed_mqtt_message", reason => Reason}),
|
||||
{ok, Channel};
|
||||
handle_in({frame_error, Reason}, Channel) ->
|
||||
handle_frame_error(Reason, Channel);
|
||||
handle_in(Packet, Channel) ->
|
||||
?SLOG(error, #{msg => "disconnecting_due_to_unexpected_message", packet => Packet}),
|
||||
handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel).
|
||||
|
@ -1021,6 +1002,68 @@ not_nacked({deliver, _Topic, Msg}) ->
|
|||
true
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Handle Frame Error
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
handle_frame_error(
|
||||
Reason = #{cause := frame_too_large},
|
||||
Channel = #channel{conn_state = ConnState, conninfo = ConnInfo}
|
||||
) when
|
||||
?IS_CONNECTED_OR_REAUTHENTICATING(ConnState)
|
||||
->
|
||||
ShutdownCount = shutdown_count(frame_error, Reason),
|
||||
case proto_ver(Reason, ConnInfo) of
|
||||
?MQTT_PROTO_V5 ->
|
||||
handle_out(disconnect, {?RC_PACKET_TOO_LARGE, frame_too_large}, Channel);
|
||||
_ ->
|
||||
shutdown(ShutdownCount, Channel)
|
||||
end;
|
||||
%% Only send CONNACK with reason code `frame_too_large` for MQTT-v5.0 when connecting,
|
||||
%% otherwise DONOT send any CONNACK or DISCONNECT packet.
|
||||
handle_frame_error(
|
||||
Reason,
|
||||
Channel = #channel{conn_state = ConnState, conninfo = ConnInfo}
|
||||
) when
|
||||
is_map(Reason) andalso
|
||||
(ConnState == idle orelse ConnState == connecting)
|
||||
->
|
||||
ShutdownCount = shutdown_count(frame_error, Reason),
|
||||
ProtoVer = proto_ver(Reason, ConnInfo),
|
||||
NChannel = Channel#channel{conninfo = ConnInfo#{proto_ver => ProtoVer}},
|
||||
case ProtoVer of
|
||||
?MQTT_PROTO_V5 ->
|
||||
shutdown(ShutdownCount, ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE), NChannel);
|
||||
_ ->
|
||||
shutdown(ShutdownCount, NChannel)
|
||||
end;
|
||||
handle_frame_error(
|
||||
Reason,
|
||||
Channel = #channel{conn_state = connecting}
|
||||
) ->
|
||||
shutdown(
|
||||
shutdown_count(frame_error, Reason),
|
||||
?CONNACK_PACKET(?RC_MALFORMED_PACKET),
|
||||
Channel
|
||||
);
|
||||
handle_frame_error(
|
||||
Reason,
|
||||
Channel = #channel{conn_state = ConnState}
|
||||
) when
|
||||
?IS_CONNECTED_OR_REAUTHENTICATING(ConnState)
|
||||
->
|
||||
handle_out(
|
||||
disconnect,
|
||||
{?RC_MALFORMED_PACKET, Reason},
|
||||
Channel
|
||||
);
|
||||
handle_frame_error(
|
||||
Reason,
|
||||
Channel = #channel{conn_state = disconnected}
|
||||
) ->
|
||||
?SLOG(error, #{msg => "malformed_mqtt_message", reason => Reason}),
|
||||
{ok, Channel}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Handle outgoing packet
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -1289,7 +1332,7 @@ handle_info(
|
|||
session = Session
|
||||
}
|
||||
) when
|
||||
ConnState =:= connected orelse ConnState =:= reauthenticating
|
||||
?IS_CONNECTED_OR_REAUTHENTICATING(ConnState)
|
||||
->
|
||||
{Intent, Session1} = session_disconnect(ClientInfo, ConnInfo, Session),
|
||||
Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(sock_closed, Channel)),
|
||||
|
@ -2636,8 +2679,7 @@ save_alias(outbound, AliasId, Topic, TopicAliases = #{outbound := Aliases}) ->
|
|||
NAliases = maps:put(Topic, AliasId, Aliases),
|
||||
TopicAliases#{outbound => NAliases}.
|
||||
|
||||
-compile({inline, [reply/2, shutdown/2, shutdown/3, sp/1, flag/1]}).
|
||||
|
||||
-compile({inline, [reply/2, shutdown/2, shutdown/3]}).
|
||||
reply(Reply, Channel) ->
|
||||
{reply, Reply, Channel}.
|
||||
|
||||
|
@ -2673,13 +2715,13 @@ disconnect_and_shutdown(
|
|||
?IS_MQTT_V5 =
|
||||
#channel{conn_state = ConnState}
|
||||
) when
|
||||
ConnState =:= connected orelse ConnState =:= reauthenticating
|
||||
?IS_CONNECTED_OR_REAUTHENTICATING(ConnState)
|
||||
->
|
||||
NChannel = ensure_disconnected(Reason, Channel),
|
||||
shutdown(Reason, Reply, ?DISCONNECT_PACKET(reason_code(Reason)), NChannel);
|
||||
%% mqtt v3/v4 connected sessions
|
||||
disconnect_and_shutdown(Reason, Reply, Channel = #channel{conn_state = ConnState}) when
|
||||
ConnState =:= connected orelse ConnState =:= reauthenticating
|
||||
?IS_CONNECTED_OR_REAUTHENTICATING(ConnState)
|
||||
->
|
||||
NChannel = ensure_disconnected(Reason, Channel),
|
||||
shutdown(Reason, Reply, NChannel);
|
||||
|
@ -2722,6 +2764,13 @@ is_durable_session(#channel{session = Session}) ->
|
|||
false
|
||||
end.
|
||||
|
||||
proto_ver(#{proto_ver := ProtoVer}, _ConnInfo) ->
|
||||
ProtoVer;
|
||||
proto_ver(_Reason, #{proto_ver := ProtoVer}) ->
|
||||
ProtoVer;
|
||||
proto_ver(_, _) ->
|
||||
?MQTT_PROTO_V4.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% For CT tests
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -783,7 +783,8 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
|
|||
input_bytes => Data,
|
||||
parsed_packets => Packets
|
||||
}),
|
||||
{[{frame_error, Reason} | Packets], State};
|
||||
NState = enrich_state(Reason, State),
|
||||
{[{frame_error, Reason} | Packets], NState};
|
||||
error:Reason:Stacktrace ->
|
||||
?LOG(error, #{
|
||||
at_state => emqx_frame:describe_state(ParseState),
|
||||
|
@ -1227,6 +1228,12 @@ inc_counter(Key, Inc) ->
|
|||
_ = emqx_pd:inc_counter(Key, Inc),
|
||||
ok.
|
||||
|
||||
enrich_state(#{parse_state := NParseState}, State) ->
|
||||
Serialize = emqx_frame:serialize_opts(NParseState),
|
||||
State#state{parse_state = NParseState, serialize = Serialize};
|
||||
enrich_state(_, State) ->
|
||||
State.
|
||||
|
||||
set_tcp_keepalive({quic, _Listener}) ->
|
||||
ok;
|
||||
set_tcp_keepalive({Type, Id}) ->
|
||||
|
|
|
@ -117,6 +117,13 @@ try_subscribe(ClientId, Topic) ->
|
|||
write
|
||||
),
|
||||
allow;
|
||||
[#exclusive_subscription{clientid = ClientId, topic = Topic}] ->
|
||||
%% Fixed the issue-13476
|
||||
%% In this feature, the user must manually call `unsubscribe` to release the lock,
|
||||
%% but sometimes the node may go down for some reason,
|
||||
%% then the client will reconnect to this node and resubscribe.
|
||||
%% We need to allow resubscription, otherwise the lock will never be released.
|
||||
allow;
|
||||
[_] ->
|
||||
deny
|
||||
end.
|
||||
|
|
|
@ -43,7 +43,9 @@
|
|||
add_shared_route/2,
|
||||
delete_shared_route/2,
|
||||
add_persistent_route/2,
|
||||
delete_persistent_route/2
|
||||
delete_persistent_route/2,
|
||||
add_persistent_shared_route/3,
|
||||
delete_persistent_shared_route/3
|
||||
]).
|
||||
|
||||
-export_type([dest/0]).
|
||||
|
@ -129,6 +131,12 @@ add_persistent_route(Topic, ID) ->
|
|||
delete_persistent_route(Topic, ID) ->
|
||||
?safe_with_provider(?FUNCTION_NAME(Topic, ID), ok).
|
||||
|
||||
add_persistent_shared_route(Topic, Group, ID) ->
|
||||
?safe_with_provider(?FUNCTION_NAME(Topic, Group, ID), ok).
|
||||
|
||||
delete_persistent_shared_route(Topic, Group, ID) ->
|
||||
?safe_with_provider(?FUNCTION_NAME(Topic, Group, ID), ok).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -267,28 +267,50 @@ packet(Header, Variable) ->
|
|||
packet(Header, Variable, Payload) ->
|
||||
#mqtt_packet{header = Header, variable = Variable, payload = Payload}.
|
||||
|
||||
parse_connect(FrameBin, StrictMode) ->
|
||||
{ProtoName, Rest} = parse_utf8_string_with_cause(FrameBin, StrictMode, invalid_proto_name),
|
||||
case ProtoName of
|
||||
<<"MQTT">> ->
|
||||
ok;
|
||||
<<"MQIsdp">> ->
|
||||
ok;
|
||||
_ ->
|
||||
%% from spec: the server MAY send disconnect with reason code 0x84
|
||||
%% we chose to close socket because the client is likely not talking MQTT anyway
|
||||
?PARSE_ERR(#{
|
||||
cause => invalid_proto_name,
|
||||
expected => <<"'MQTT' or 'MQIsdp'">>,
|
||||
received => ProtoName
|
||||
})
|
||||
end,
|
||||
parse_connect2(ProtoName, Rest, StrictMode).
|
||||
parse_connect(FrameBin, Options = #{strict_mode := StrictMode}) ->
|
||||
{ProtoName, Rest0} = parse_utf8_string_with_cause(FrameBin, StrictMode, invalid_proto_name),
|
||||
%% No need to parse and check proto_ver if proto_name is invalid, check it first
|
||||
%% And the matching check of `proto_name` and `proto_ver` fields will be done in `emqx_packet:check_proto_ver/2`
|
||||
_ = validate_proto_name(ProtoName),
|
||||
{IsBridge, ProtoVer, Rest2} = parse_connect_proto_ver(Rest0),
|
||||
NOptions = Options#{version => ProtoVer},
|
||||
try
|
||||
do_parse_connect(ProtoName, IsBridge, ProtoVer, Rest2, StrictMode)
|
||||
catch
|
||||
throw:{?FRAME_PARSE_ERROR, ReasonM} when is_map(ReasonM) ->
|
||||
?PARSE_ERR(
|
||||
ReasonM#{
|
||||
proto_ver => ProtoVer,
|
||||
proto_name => ProtoName,
|
||||
parse_state => ?NONE(NOptions)
|
||||
}
|
||||
);
|
||||
throw:{?FRAME_PARSE_ERROR, Reason} ->
|
||||
?PARSE_ERR(
|
||||
#{
|
||||
cause => Reason,
|
||||
proto_ver => ProtoVer,
|
||||
proto_name => ProtoName,
|
||||
parse_state => ?NONE(NOptions)
|
||||
}
|
||||
)
|
||||
end.
|
||||
|
||||
parse_connect2(
|
||||
do_parse_connect(
|
||||
ProtoName,
|
||||
<<BridgeTag:4, ProtoVer:4, UsernameFlagB:1, PasswordFlagB:1, WillRetainB:1, WillQoS:2,
|
||||
WillFlagB:1, CleanStart:1, Reserved:1, KeepAlive:16/big, Rest2/binary>>,
|
||||
IsBridge,
|
||||
ProtoVer,
|
||||
<<
|
||||
UsernameFlagB:1,
|
||||
PasswordFlagB:1,
|
||||
WillRetainB:1,
|
||||
WillQoS:2,
|
||||
WillFlagB:1,
|
||||
CleanStart:1,
|
||||
Reserved:1,
|
||||
KeepAlive:16/big,
|
||||
Rest/binary
|
||||
>>,
|
||||
StrictMode
|
||||
) ->
|
||||
_ = validate_connect_reserved(Reserved),
|
||||
|
@ -303,14 +325,14 @@ parse_connect2(
|
|||
UsernameFlag = bool(UsernameFlagB),
|
||||
PasswordFlag = bool(PasswordFlagB)
|
||||
),
|
||||
{Properties, Rest3} = parse_properties(Rest2, ProtoVer, StrictMode),
|
||||
{Properties, Rest3} = parse_properties(Rest, ProtoVer, StrictMode),
|
||||
{ClientId, Rest4} = parse_utf8_string_with_cause(Rest3, StrictMode, invalid_clientid),
|
||||
ConnPacket = #mqtt_packet_connect{
|
||||
proto_name = ProtoName,
|
||||
proto_ver = ProtoVer,
|
||||
%% For bridge mode, non-standard implementation
|
||||
%% Invented by mosquitto, named 'try_private': https://mosquitto.org/man/mosquitto-conf-5.html
|
||||
is_bridge = (BridgeTag =:= 8),
|
||||
is_bridge = IsBridge,
|
||||
clean_start = bool(CleanStart),
|
||||
will_flag = WillFlag,
|
||||
will_qos = WillQoS,
|
||||
|
@ -343,16 +365,16 @@ parse_connect2(
|
|||
unexpected_trailing_bytes => size(Rest7)
|
||||
})
|
||||
end;
|
||||
parse_connect2(_ProtoName, Bin, _StrictMode) ->
|
||||
%% sent less than 32 bytes
|
||||
do_parse_connect(_ProtoName, _IsBridge, _ProtoVer, Bin, _StrictMode) ->
|
||||
%% sent less than 24 bytes
|
||||
?PARSE_ERR(#{cause => malformed_connect, header_bytes => Bin}).
|
||||
|
||||
parse_packet(
|
||||
#mqtt_packet_header{type = ?CONNECT},
|
||||
FrameBin,
|
||||
#{strict_mode := StrictMode}
|
||||
Options
|
||||
) ->
|
||||
parse_connect(FrameBin, StrictMode);
|
||||
parse_connect(FrameBin, Options);
|
||||
parse_packet(
|
||||
#mqtt_packet_header{type = ?CONNACK},
|
||||
<<AckFlags:8, ReasonCode:8, Rest/binary>>,
|
||||
|
@ -516,6 +538,12 @@ parse_packet_id(<<PacketId:16/big, Rest/binary>>) ->
|
|||
parse_packet_id(_) ->
|
||||
?PARSE_ERR(invalid_packet_id).
|
||||
|
||||
parse_connect_proto_ver(<<BridgeTag:4, ProtoVer:4, Rest/binary>>) ->
|
||||
{_IsBridge = (BridgeTag =:= 8), ProtoVer, Rest};
|
||||
parse_connect_proto_ver(Bin) ->
|
||||
%% sent less than 1 bytes or empty
|
||||
?PARSE_ERR(#{cause => malformed_connect, header_bytes => Bin}).
|
||||
|
||||
parse_properties(Bin, Ver, _StrictMode) when Ver =/= ?MQTT_PROTO_V5 ->
|
||||
{#{}, Bin};
|
||||
%% TODO: version mess?
|
||||
|
@ -739,6 +767,8 @@ serialize_fun(#{version := Ver, max_size := MaxSize, strict_mode := StrictMode})
|
|||
initial_serialize_opts(Opts) ->
|
||||
maps:merge(?DEFAULT_OPTIONS, Opts).
|
||||
|
||||
serialize_opts(?NONE(Options)) ->
|
||||
maps:merge(?DEFAULT_OPTIONS, Options);
|
||||
serialize_opts(#mqtt_packet_connect{proto_ver = ProtoVer, properties = ConnProps}) ->
|
||||
MaxSize = get_property('Maximum-Packet-Size', ConnProps, ?MAX_PACKET_SIZE),
|
||||
#{version => ProtoVer, max_size => MaxSize, strict_mode => false}.
|
||||
|
@ -1157,18 +1187,34 @@ validate_subqos([3 | _]) -> ?PARSE_ERR(bad_subqos);
|
|||
validate_subqos([_ | T]) -> validate_subqos(T);
|
||||
validate_subqos([]) -> ok.
|
||||
|
||||
%% from spec: the server MAY send disconnect with reason code 0x84
|
||||
%% we chose to close socket because the client is likely not talking MQTT anyway
|
||||
validate_proto_name(<<"MQTT">>) ->
|
||||
ok;
|
||||
validate_proto_name(<<"MQIsdp">>) ->
|
||||
ok;
|
||||
validate_proto_name(ProtoName) ->
|
||||
?PARSE_ERR(#{
|
||||
cause => invalid_proto_name,
|
||||
expected => <<"'MQTT' or 'MQIsdp'">>,
|
||||
received => ProtoName
|
||||
}).
|
||||
|
||||
%% MQTT-v3.1.1-[MQTT-3.1.2-3], MQTT-v5.0-[MQTT-3.1.2-3]
|
||||
-compile({inline, [validate_connect_reserved/1]}).
|
||||
validate_connect_reserved(0) -> ok;
|
||||
validate_connect_reserved(1) -> ?PARSE_ERR(reserved_connect_flag).
|
||||
|
||||
-compile({inline, [validate_connect_will/3]}).
|
||||
%% MQTT-v3.1.1-[MQTT-3.1.2-13], MQTT-v5.0-[MQTT-3.1.2-11]
|
||||
validate_connect_will(false, _, WillQos) when WillQos > 0 -> ?PARSE_ERR(invalid_will_qos);
|
||||
validate_connect_will(false, _, WillQoS) when WillQoS > 0 -> ?PARSE_ERR(invalid_will_qos);
|
||||
%% MQTT-v3.1.1-[MQTT-3.1.2-14], MQTT-v5.0-[MQTT-3.1.2-12]
|
||||
validate_connect_will(true, _, WillQoS) when WillQoS > 2 -> ?PARSE_ERR(invalid_will_qos);
|
||||
%% MQTT-v3.1.1-[MQTT-3.1.2-15], MQTT-v5.0-[MQTT-3.1.2-13]
|
||||
validate_connect_will(false, WillRetain, _) when WillRetain -> ?PARSE_ERR(invalid_will_retain);
|
||||
validate_connect_will(_, _, _) -> ok.
|
||||
|
||||
-compile({inline, [validate_connect_password_flag/4]}).
|
||||
%% MQTT-v3.1
|
||||
%% Username flag and password flag are not strongly related
|
||||
%% https://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#connect
|
||||
|
@ -1183,6 +1229,7 @@ validate_connect_password_flag(true, ?MQTT_PROTO_V5, _, _) ->
|
|||
validate_connect_password_flag(_, _, _, _) ->
|
||||
ok.
|
||||
|
||||
-compile({inline, [bool/1]}).
|
||||
bool(0) -> false;
|
||||
bool(1) -> true.
|
||||
|
||||
|
|
|
@ -432,7 +432,7 @@ do_start_listener(Type, Name, Id, #{bind := ListenOn} = Opts) when ?ESOCKD_LISTE
|
|||
esockd:open(
|
||||
Id,
|
||||
ListenOn,
|
||||
merge_default(esockd_opts(Id, Type, Name, Opts))
|
||||
merge_default(esockd_opts(Id, Type, Name, Opts, _OldOpts = undefined))
|
||||
);
|
||||
%% Start MQTT/WS listener
|
||||
do_start_listener(Type, Name, Id, Opts) when ?COWBOY_LISTENER(Type) ->
|
||||
|
@ -476,7 +476,7 @@ do_update_listener(Type, Name, OldConf, NewConf = #{bind := ListenOn}) when
|
|||
Id = listener_id(Type, Name),
|
||||
case maps:get(bind, OldConf) of
|
||||
ListenOn ->
|
||||
esockd:set_options({Id, ListenOn}, esockd_opts(Id, Type, Name, NewConf));
|
||||
esockd:set_options({Id, ListenOn}, esockd_opts(Id, Type, Name, NewConf, OldConf));
|
||||
_Different ->
|
||||
%% TODO
|
||||
%% Again, we're not strictly required to drop live connections in this case.
|
||||
|
@ -588,7 +588,7 @@ perform_listener_change(update, {{Type, Name, ConfOld}, {_, _, ConfNew}}) ->
|
|||
perform_listener_change(stop, {Type, Name, Conf}) ->
|
||||
stop_listener(Type, Name, Conf).
|
||||
|
||||
esockd_opts(ListenerId, Type, Name, Opts0) ->
|
||||
esockd_opts(ListenerId, Type, Name, Opts0, OldOpts) ->
|
||||
Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0),
|
||||
Limiter = limiter(Opts0),
|
||||
Opts2 =
|
||||
|
@ -620,7 +620,7 @@ esockd_opts(ListenerId, Type, Name, Opts0) ->
|
|||
tcp ->
|
||||
Opts3#{tcp_options => tcp_opts(Opts0)};
|
||||
ssl ->
|
||||
OptsWithCRL = inject_crl_config(Opts0),
|
||||
OptsWithCRL = inject_crl_config(Opts0, OldOpts),
|
||||
OptsWithSNI = inject_sni_fun(ListenerId, OptsWithCRL),
|
||||
OptsWithRootFun = inject_root_fun(OptsWithSNI),
|
||||
OptsWithVerifyFun = inject_verify_fun(OptsWithRootFun),
|
||||
|
@ -996,7 +996,7 @@ inject_sni_fun(_ListenerId, Conf) ->
|
|||
Conf.
|
||||
|
||||
inject_crl_config(
|
||||
Conf = #{ssl_options := #{enable_crl_check := true} = SSLOpts}
|
||||
Conf = #{ssl_options := #{enable_crl_check := true} = SSLOpts}, _OldOpts
|
||||
) ->
|
||||
HTTPTimeout = emqx_config:get([crl_cache, http_timeout], timer:seconds(15)),
|
||||
Conf#{
|
||||
|
@ -1006,7 +1006,16 @@ inject_crl_config(
|
|||
crl_cache => {emqx_ssl_crl_cache, {internal, [{http, HTTPTimeout}]}}
|
||||
}
|
||||
};
|
||||
inject_crl_config(Conf) ->
|
||||
inject_crl_config(#{ssl_options := SSLOpts0} = Conf0, #{} = OldOpts) ->
|
||||
%% Note: we must set crl options to `undefined' to unset them. Otherwise,
|
||||
%% `esockd' will retain such options when `esockd:merge_opts/2' is called and the SSL
|
||||
%% options were previously enabled.
|
||||
WasEnabled = emqx_utils_maps:deep_get([ssl_options, enable_crl_check], OldOpts, false),
|
||||
Undefine = fun(Acc, K) -> emqx_utils_maps:put_if(Acc, K, undefined, WasEnabled) end,
|
||||
SSLOpts1 = Undefine(SSLOpts0, crl_check),
|
||||
SSLOpts = Undefine(SSLOpts1, crl_cache),
|
||||
Conf0#{ssl_options := SSLOpts};
|
||||
inject_crl_config(Conf, undefined = _OldOpts) ->
|
||||
Conf.
|
||||
|
||||
maybe_unregister_ocsp_stapling_refresh(
|
||||
|
|
|
@ -25,7 +25,7 @@
|
|||
-export([start_link/0]).
|
||||
|
||||
%% throttler API
|
||||
-export([allow/1]).
|
||||
-export([allow/2]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([
|
||||
|
@ -40,23 +40,29 @@
|
|||
-define(SEQ_ID(Msg), {?MODULE, Msg}).
|
||||
-define(NEW_SEQ, atomics:new(1, [{signed, false}])).
|
||||
-define(GET_SEQ(Msg), persistent_term:get(?SEQ_ID(Msg), undefined)).
|
||||
-define(ERASE_SEQ(Msg), persistent_term:erase(?SEQ_ID(Msg))).
|
||||
-define(RESET_SEQ(SeqRef), atomics:put(SeqRef, 1, 0)).
|
||||
-define(INC_SEQ(SeqRef), atomics:add(SeqRef, 1, 1)).
|
||||
-define(GET_DROPPED(SeqRef), atomics:get(SeqRef, 1) - 1).
|
||||
-define(IS_ALLOWED(SeqRef), atomics:add_get(SeqRef, 1, 1) =:= 1).
|
||||
|
||||
-define(NEW_THROTTLE(Msg, SeqRef), persistent_term:put(?SEQ_ID(Msg), SeqRef)).
|
||||
|
||||
-define(MSGS_LIST, emqx:get_config([log, throttling, msgs], [])).
|
||||
-define(TIME_WINDOW_MS, timer:seconds(emqx:get_config([log, throttling, time_window], 60))).
|
||||
|
||||
-spec allow(atom()) -> boolean().
|
||||
allow(Msg) when is_atom(Msg) ->
|
||||
%% @doc Check if a throttled log message is allowed to pass down to the logger this time.
|
||||
%% The Msg has to be an atom, and the second argument `UniqueKey' should be `undefined'
|
||||
%% for predefined message IDs.
|
||||
%% For relatively static resources created from configurations such as data integration
|
||||
%% resource IDs `UniqueKey' should be of `binary()' type.
|
||||
-spec allow(atom(), undefined | binary()) -> boolean().
|
||||
allow(Msg, UniqueKey) when
|
||||
is_atom(Msg) andalso (is_binary(UniqueKey) orelse UniqueKey =:= undefined)
|
||||
->
|
||||
case emqx_logger:get_primary_log_level() of
|
||||
debug ->
|
||||
true;
|
||||
_ ->
|
||||
do_allow(Msg)
|
||||
do_allow(Msg, UniqueKey)
|
||||
end.
|
||||
|
||||
-spec start_link() -> startlink_ret().
|
||||
|
@ -68,7 +74,8 @@ start_link() ->
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
init([]) ->
|
||||
ok = lists:foreach(fun(Msg) -> ?NEW_THROTTLE(Msg, ?NEW_SEQ) end, ?MSGS_LIST),
|
||||
process_flag(trap_exit, true),
|
||||
ok = lists:foreach(fun new_throttler/1, ?MSGS_LIST),
|
||||
CurrentPeriodMs = ?TIME_WINDOW_MS,
|
||||
TimerRef = schedule_refresh(CurrentPeriodMs),
|
||||
{ok, #{timer_ref => TimerRef, current_period_ms => CurrentPeriodMs}}.
|
||||
|
@ -86,16 +93,22 @@ handle_info(refresh, #{current_period_ms := PeriodMs} = State) ->
|
|||
DroppedStats = lists:foldl(
|
||||
fun(Msg, Acc) ->
|
||||
case ?GET_SEQ(Msg) of
|
||||
%% Should not happen, unless the static ids list is updated at run-time.
|
||||
undefined ->
|
||||
?NEW_THROTTLE(Msg, ?NEW_SEQ),
|
||||
%% Should not happen, unless the static ids list is updated at run-time.
|
||||
new_throttler(Msg),
|
||||
?tp(log_throttler_new_msg, #{throttled_msg => Msg}),
|
||||
Acc;
|
||||
SeqMap when is_map(SeqMap) ->
|
||||
maps:fold(
|
||||
fun(Key, Ref, Acc0) ->
|
||||
ID = iolist_to_binary([atom_to_binary(Msg), $:, Key]),
|
||||
drop_stats(Ref, ID, Acc0)
|
||||
end,
|
||||
Acc,
|
||||
SeqMap
|
||||
);
|
||||
SeqRef ->
|
||||
Dropped = ?GET_DROPPED(SeqRef),
|
||||
ok = ?RESET_SEQ(SeqRef),
|
||||
?tp(log_throttler_dropped, #{dropped_count => Dropped, throttled_msg => Msg}),
|
||||
maybe_add_dropped(Msg, Dropped, Acc)
|
||||
drop_stats(SeqRef, Msg, Acc)
|
||||
end
|
||||
end,
|
||||
#{},
|
||||
|
@ -112,7 +125,16 @@ handle_info(Info, State) ->
|
|||
?SLOG(error, #{msg => "unxpected_info", info => Info}),
|
||||
{noreply, State}.
|
||||
|
||||
drop_stats(SeqRef, Msg, Acc) ->
|
||||
Dropped = ?GET_DROPPED(SeqRef),
|
||||
ok = ?RESET_SEQ(SeqRef),
|
||||
?tp(log_throttler_dropped, #{dropped_count => Dropped, throttled_msg => Msg}),
|
||||
maybe_add_dropped(Msg, Dropped, Acc).
|
||||
|
||||
terminate(_Reason, _State) ->
|
||||
%% atomics do not have delete/remove/release/deallocate API
|
||||
%% after the reference is garbage-collected the resource is released
|
||||
lists:foreach(fun(Msg) -> ?ERASE_SEQ(Msg) end, ?MSGS_LIST),
|
||||
ok.
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
|
@ -122,17 +144,27 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
%% internal functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
do_allow(Msg) ->
|
||||
do_allow(Msg, UniqueKey) ->
|
||||
case persistent_term:get(?SEQ_ID(Msg), undefined) of
|
||||
undefined ->
|
||||
%% This is either a race condition (emqx_log_throttler is not started yet)
|
||||
%% or a developer mistake (msg used in ?SLOG_THROTTLE/2,3 macro is
|
||||
%% not added to the default value of `log.throttling.msgs`.
|
||||
?SLOG(info, #{
|
||||
msg => "missing_log_throttle_sequence",
|
||||
?SLOG(debug, #{
|
||||
msg => "log_throttle_disabled",
|
||||
throttled_msg => Msg
|
||||
}),
|
||||
true;
|
||||
%% e.g: unrecoverable msg throttle according resource_id
|
||||
SeqMap when is_map(SeqMap) ->
|
||||
case maps:find(UniqueKey, SeqMap) of
|
||||
{ok, SeqRef} ->
|
||||
?IS_ALLOWED(SeqRef);
|
||||
error ->
|
||||
SeqRef = ?NEW_SEQ,
|
||||
new_throttler(Msg, SeqMap#{UniqueKey => SeqRef}),
|
||||
true
|
||||
end;
|
||||
SeqRef ->
|
||||
?IS_ALLOWED(SeqRef)
|
||||
end.
|
||||
|
@ -154,3 +186,11 @@ maybe_log_dropped(_DroppedStats, _PeriodMs) ->
|
|||
schedule_refresh(PeriodMs) ->
|
||||
?tp(log_throttler_sched_refresh, #{new_period_ms => PeriodMs}),
|
||||
erlang:send_after(PeriodMs, ?MODULE, refresh).
|
||||
|
||||
new_throttler(unrecoverable_resource_error = Msg) ->
|
||||
new_throttler(Msg, #{});
|
||||
new_throttler(Msg) ->
|
||||
new_throttler(Msg, ?NEW_SEQ).
|
||||
|
||||
new_throttler(Msg, AtomicOrEmptyMap) ->
|
||||
persistent_term:put(?SEQ_ID(Msg), AtomicOrEmptyMap).
|
||||
|
|
|
@ -105,7 +105,7 @@ format(Msg, Meta, Config) ->
|
|||
maybe_format_msg(undefined, _Meta, _Config) ->
|
||||
#{};
|
||||
maybe_format_msg({report, Report0} = Msg, #{report_cb := Cb} = Meta, Config) ->
|
||||
Report = emqx_logger_textfmt:try_encode_payload(Report0, Config),
|
||||
Report = emqx_logger_textfmt:try_encode_meta(Report0, Config),
|
||||
case is_map(Report) andalso Cb =:= ?DEFAULT_FORMATTER of
|
||||
true ->
|
||||
%% reporting a map without a customised format function
|
||||
|
|
|
@ -20,7 +20,7 @@
|
|||
|
||||
-export([format/2]).
|
||||
-export([check_config/1]).
|
||||
-export([try_format_unicode/1, try_encode_payload/2]).
|
||||
-export([try_format_unicode/1, try_encode_meta/2]).
|
||||
%% Used in the other log formatters
|
||||
-export([evaluate_lazy_values_if_dbg_level/1, evaluate_lazy_values/1]).
|
||||
|
||||
|
@ -111,7 +111,7 @@ is_list_report_acceptable(_) ->
|
|||
enrich_report(ReportRaw0, Meta, Config) ->
|
||||
%% clientid and peername always in emqx_conn's process metadata.
|
||||
%% topic and username can be put in meta using ?SLOG/3, or put in msg's report by ?SLOG/2
|
||||
ReportRaw = try_encode_payload(ReportRaw0, Config),
|
||||
ReportRaw = try_encode_meta(ReportRaw0, Config),
|
||||
Topic =
|
||||
case maps:get(topic, Meta, undefined) of
|
||||
undefined -> maps:get(topic, ReportRaw, undefined);
|
||||
|
@ -180,9 +180,22 @@ enrich_topic({Fmt, Args}, #{topic := Topic}) when is_list(Fmt) ->
|
|||
enrich_topic(Msg, _) ->
|
||||
Msg.
|
||||
|
||||
try_encode_payload(#{payload := Payload} = Report, #{payload_encode := Encode}) ->
|
||||
try_encode_meta(Report, Config) ->
|
||||
lists:foldl(
|
||||
fun(Meta, Acc) ->
|
||||
try_encode_meta(Meta, Acc, Config)
|
||||
end,
|
||||
Report,
|
||||
[payload, packet]
|
||||
).
|
||||
|
||||
try_encode_meta(payload, #{payload := Payload} = Report, #{payload_encode := Encode}) ->
|
||||
Report#{payload := encode_payload(Payload, Encode)};
|
||||
try_encode_payload(Report, _Config) ->
|
||||
try_encode_meta(packet, #{packet := Packet} = Report, #{payload_encode := Encode}) when
|
||||
is_tuple(Packet)
|
||||
->
|
||||
Report#{packet := emqx_packet:format(Packet, Encode)};
|
||||
try_encode_meta(_, Report, _Config) ->
|
||||
Report.
|
||||
|
||||
encode_payload(Payload, text) ->
|
||||
|
@ -190,4 +203,5 @@ encode_payload(Payload, text) ->
|
|||
encode_payload(_Payload, hidden) ->
|
||||
"******";
|
||||
encode_payload(Payload, hex) ->
|
||||
binary:encode_hex(Payload).
|
||||
Bin = emqx_utils_conv:bin(Payload),
|
||||
binary:encode_hex(Bin).
|
||||
|
|
|
@ -51,7 +51,6 @@
|
|||
]).
|
||||
|
||||
-export([
|
||||
format/1,
|
||||
format/2
|
||||
]).
|
||||
|
||||
|
@ -481,10 +480,6 @@ will_msg(#mqtt_packet_connect{
|
|||
headers = #{username => Username, properties => Props}
|
||||
}.
|
||||
|
||||
%% @doc Format packet
|
||||
-spec format(emqx_types:packet()) -> iolist().
|
||||
format(Packet) -> format(Packet, emqx_trace_handler:payload_encode()).
|
||||
|
||||
%% @doc Format packet
|
||||
-spec format(emqx_types:packet(), hex | text | hidden) -> iolist().
|
||||
format(#mqtt_packet{header = Header, variable = Variable, payload = Payload}, PayloadEncode) ->
|
||||
|
|
|
@ -621,9 +621,13 @@ handle_timeout(ClientInfo, ?TIMER_RETRY_REPLAY, Session0) ->
|
|||
Session = replay_streams(Session0, ClientInfo),
|
||||
{ok, [], Session};
|
||||
handle_timeout(ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0, shared_sub_s := SharedSubS0}) ->
|
||||
S1 = emqx_persistent_session_ds_subs:gc(S0),
|
||||
S2 = emqx_persistent_session_ds_stream_scheduler:renew_streams(S1),
|
||||
{S, SharedSubS} = emqx_persistent_session_ds_shared_subs:renew_streams(S2, SharedSubS0),
|
||||
%% `gc` and `renew_streams` methods may drop unsubscribed streams.
|
||||
%% Shared subscription handler must have a chance to see unsubscribed streams
|
||||
%% in the fully replayed state.
|
||||
{S1, SharedSubS1} = emqx_persistent_session_ds_shared_subs:pre_renew_streams(S0, SharedSubS0),
|
||||
S2 = emqx_persistent_session_ds_subs:gc(S1),
|
||||
S3 = emqx_persistent_session_ds_stream_scheduler:renew_streams(S2),
|
||||
{S, SharedSubS} = emqx_persistent_session_ds_shared_subs:renew_streams(S3, SharedSubS1),
|
||||
Interval = get_config(ClientInfo, [renew_streams_interval]),
|
||||
Session = emqx_session:ensure_timer(
|
||||
?TIMER_GET_STREAMS,
|
||||
|
@ -757,7 +761,7 @@ skip_batch(StreamKey, SRS0, Session = #{s := S0}, ClientInfo, Reason) ->
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
-spec disconnect(session(), emqx_types:conninfo()) -> {shutdown, session()}.
|
||||
disconnect(Session = #{id := Id, s := S0}, ConnInfo) ->
|
||||
disconnect(Session = #{id := Id, s := S0, shared_sub_s := SharedSubS0}, ConnInfo) ->
|
||||
S1 = maybe_set_offline_info(S0, Id),
|
||||
S2 = emqx_persistent_session_ds_state:set_last_alive_at(now_ms(), S1),
|
||||
S3 =
|
||||
|
@ -767,8 +771,9 @@ disconnect(Session = #{id := Id, s := S0}, ConnInfo) ->
|
|||
_ ->
|
||||
S2
|
||||
end,
|
||||
S = emqx_persistent_session_ds_state:commit(S3),
|
||||
{shutdown, Session#{s => S}}.
|
||||
{S4, SharedSubS} = emqx_persistent_session_ds_shared_subs:on_disconnect(S3, SharedSubS0),
|
||||
S = emqx_persistent_session_ds_state:commit(S4),
|
||||
{shutdown, Session#{s => S, shared_sub_s => SharedSubS}}.
|
||||
|
||||
-spec terminate(Reason :: term(), session()) -> ok.
|
||||
terminate(_Reason, Session = #{id := Id, s := S}) ->
|
||||
|
@ -816,10 +821,12 @@ list_client_subscriptions(ClientId) ->
|
|||
{error, not_found}
|
||||
end.
|
||||
|
||||
-spec get_client_subscription(emqx_types:clientid(), emqx_types:topic()) ->
|
||||
-spec get_client_subscription(emqx_types:clientid(), topic_filter() | share_topic_filter()) ->
|
||||
subscription() | undefined.
|
||||
get_client_subscription(ClientId, Topic) ->
|
||||
emqx_persistent_session_ds_subs:cold_get_subscription(ClientId, Topic).
|
||||
get_client_subscription(ClientId, #share{} = ShareTopicFilter) ->
|
||||
emqx_persistent_session_ds_shared_subs:cold_get_subscription(ClientId, ShareTopicFilter);
|
||||
get_client_subscription(ClientId, TopicFilter) ->
|
||||
emqx_persistent_session_ds_subs:cold_get_subscription(ClientId, TopicFilter).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Session tables operations
|
||||
|
@ -986,14 +993,14 @@ do_ensure_all_iterators_closed(_DSSessionID) ->
|
|||
%% Normal replay:
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
fetch_new_messages(Session0 = #{s := S0}, ClientInfo) ->
|
||||
LFS = maps:get(last_fetched_stream, Session0, beginning),
|
||||
ItStream = emqx_persistent_session_ds_stream_scheduler:iter_next_streams(LFS, S0),
|
||||
fetch_new_messages(Session0 = #{s := S0, shared_sub_s := SharedSubS0}, ClientInfo) ->
|
||||
{S1, SharedSubS1} = emqx_persistent_session_ds_shared_subs:on_streams_replay(S0, SharedSubS0),
|
||||
Session1 = Session0#{s => S1, shared_sub_s => SharedSubS1},
|
||||
LFS = maps:get(last_fetched_stream, Session1, beginning),
|
||||
ItStream = emqx_persistent_session_ds_stream_scheduler:iter_next_streams(LFS, S1),
|
||||
BatchSize = get_config(ClientInfo, [batch_size]),
|
||||
Session1 = fetch_new_messages(ItStream, BatchSize, Session0, ClientInfo),
|
||||
#{s := S1, shared_sub_s := SharedSubS0} = Session1,
|
||||
{S2, SharedSubS1} = emqx_persistent_session_ds_shared_subs:on_streams_replayed(S1, SharedSubS0),
|
||||
Session1#{s => S2, shared_sub_s => SharedSubS1}.
|
||||
Session2 = fetch_new_messages(ItStream, BatchSize, Session1, ClientInfo),
|
||||
Session2#{shared_sub_s => SharedSubS1}.
|
||||
|
||||
fetch_new_messages(ItStream0, BatchSize, Session0, ClientInfo) ->
|
||||
#{inflight := Inflight} = Session0,
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
-module(emqx_persistent_session_ds_router).
|
||||
|
||||
-include("emqx.hrl").
|
||||
-include("emqx_persistent_session_ds/emqx_ps_ds_int.hrl").
|
||||
-include("emqx_ps_ds_int.hrl").
|
||||
|
||||
-export([init_tables/0]).
|
||||
|
||||
|
@ -47,7 +47,7 @@
|
|||
-endif.
|
||||
|
||||
-type route() :: #ps_route{}.
|
||||
-type dest() :: emqx_persistent_session_ds:id().
|
||||
-type dest() :: emqx_persistent_session_ds:id() | #share_dest{}.
|
||||
|
||||
-export_type([dest/0, route/0]).
|
||||
|
||||
|
@ -161,7 +161,7 @@ topics() ->
|
|||
print_routes(Topic) ->
|
||||
lists:foreach(
|
||||
fun(#ps_route{topic = To, dest = Dest}) ->
|
||||
io:format("~ts -> ~ts~n", [To, Dest])
|
||||
io:format("~ts -> ~tp~n", [To, Dest])
|
||||
end,
|
||||
match_routes(Topic)
|
||||
).
|
||||
|
@ -247,6 +247,8 @@ mk_filtertab_fold_fun(FoldFun) ->
|
|||
match_filters(Topic) ->
|
||||
emqx_topic_index:matches(Topic, ?PS_FILTERS_TAB, []).
|
||||
|
||||
get_dest_session_id(#share_dest{session_id = DSSessionId}) ->
|
||||
DSSessionId;
|
||||
get_dest_session_id({_, DSSessionId}) ->
|
||||
DSSessionId;
|
||||
get_dest_session_id(DSSessionId) ->
|
||||
|
|
|
@ -2,11 +2,37 @@
|
|||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc This module
|
||||
%% * handles creation and management of _shared_ subscriptions for the session;
|
||||
%% * provides streams to the session;
|
||||
%% * handles progress of stream replay.
|
||||
%%
|
||||
%% The logic is quite straightforward; most of the parts resemble the logic of the
|
||||
%% `emqx_persistent_session_ds_subs` (subscribe/unsubscribe) and
|
||||
%% `emqx_persistent_session_ds_scheduler` (providing new streams),
|
||||
%% but some data is sent or received from the `emqx_persistent_session_ds_shared_subs_agent`
|
||||
%% which communicates with remote shared subscription leaders.
|
||||
%%
|
||||
%% A tricky part is the concept of "scheduled actions". When we unsubscribe from a topic
|
||||
%% we may have some streams that have unacked messages. So we do not have a reliable
|
||||
%% progress for them. Sending the current progress to the leader and disconnecting
|
||||
%% will lead to the duplication of messages. So after unsubscription, we need to wait
|
||||
%% some time until all streams are acked, and only then we disconnect from the leader.
|
||||
%%
|
||||
%% For this purpose we have the `scheduled_actions` map in the state of the module.
|
||||
%% We preserve there the streams that we need to wait for and collect their progress.
|
||||
%% We also use `scheduled_actions` for resubscriptions. If a client quickly resubscribes
|
||||
%% after unsubscription, we may still have the mentioned streams unacked. If we abandon
|
||||
%% them, just connect to the leader, then it may lease us the same streams again, but with
|
||||
%% the previous progress. So messages may duplicate.
|
||||
|
||||
-module(emqx_persistent_session_ds_shared_subs).
|
||||
|
||||
-include("emqx_mqtt.hrl").
|
||||
-include("emqx.hrl").
|
||||
-include("logger.hrl").
|
||||
-include("session_internals.hrl").
|
||||
|
||||
-include_lib("snabbkaffe/include/trace.hrl").
|
||||
|
||||
-export([
|
||||
|
@ -15,16 +41,56 @@
|
|||
|
||||
on_subscribe/3,
|
||||
on_unsubscribe/4,
|
||||
on_disconnect/2,
|
||||
|
||||
on_streams_replayed/2,
|
||||
on_streams_replay/2,
|
||||
on_info/3,
|
||||
|
||||
pre_renew_streams/2,
|
||||
renew_streams/2,
|
||||
to_map/2
|
||||
]).
|
||||
|
||||
%% Management API:
|
||||
-export([
|
||||
cold_get_subscription/2
|
||||
]).
|
||||
|
||||
-export([
|
||||
format_lease_events/1,
|
||||
format_stream_progresses/1
|
||||
]).
|
||||
|
||||
-define(schedule_subscribe, schedule_subscribe).
|
||||
-define(schedule_unsubscribe, schedule_unsubscribe).
|
||||
|
||||
-type stream_key() :: {emqx_persistent_session_ds:id(), emqx_ds:stream()}.
|
||||
|
||||
-type scheduled_action_type() ::
|
||||
{?schedule_subscribe, emqx_types:subopts()} | ?schedule_unsubscribe.
|
||||
|
||||
-type agent_stream_progress() :: #{
|
||||
stream := emqx_ds:stream(),
|
||||
progress := progress(),
|
||||
use_finished := boolean()
|
||||
}.
|
||||
|
||||
-type progress() ::
|
||||
#{
|
||||
iterator := emqx_ds:iterator()
|
||||
}.
|
||||
|
||||
-type scheduled_action() :: #{
|
||||
type := scheduled_action_type(),
|
||||
stream_keys_to_wait := [stream_key()],
|
||||
progresses := [agent_stream_progress()]
|
||||
}.
|
||||
|
||||
-type t() :: #{
|
||||
agent := emqx_persistent_session_ds_shared_subs_agent:t()
|
||||
agent := emqx_persistent_session_ds_shared_subs_agent:t(),
|
||||
scheduled_actions := #{
|
||||
share_topic_filter() => scheduled_action()
|
||||
}
|
||||
}.
|
||||
-type share_topic_filter() :: emqx_persistent_session_ds:share_topic_filter().
|
||||
-type opts() :: #{
|
||||
|
@ -34,184 +100,90 @@
|
|||
-define(rank_x, rank_shared).
|
||||
-define(rank_y, 0).
|
||||
|
||||
-export_type([
|
||||
progress/0,
|
||||
agent_stream_progress/0
|
||||
]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% API
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% new
|
||||
|
||||
-spec new(opts()) -> t().
|
||||
new(Opts) ->
|
||||
#{
|
||||
agent => emqx_persistent_session_ds_shared_subs_agent:new(
|
||||
agent_opts(Opts)
|
||||
)
|
||||
),
|
||||
scheduled_actions => #{}
|
||||
}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% open
|
||||
|
||||
-spec open(emqx_persistent_session_ds_state:t(), opts()) ->
|
||||
{ok, emqx_persistent_session_ds_state:t(), t()}.
|
||||
open(S, Opts) ->
|
||||
open(S0, Opts) ->
|
||||
SharedSubscriptions = fold_shared_subs(
|
||||
fun(#share{} = TopicFilter, Sub, Acc) ->
|
||||
[{TopicFilter, to_agent_subscription(S, Sub)} | Acc]
|
||||
fun(#share{} = ShareTopicFilter, Sub, Acc) ->
|
||||
[{ShareTopicFilter, to_agent_subscription(S0, Sub)} | Acc]
|
||||
end,
|
||||
[],
|
||||
S
|
||||
S0
|
||||
),
|
||||
Agent = emqx_persistent_session_ds_shared_subs_agent:open(
|
||||
SharedSubscriptions, agent_opts(Opts)
|
||||
),
|
||||
SharedSubS = #{agent => Agent},
|
||||
{ok, S, SharedSubS}.
|
||||
SharedSubS = #{agent => Agent, scheduled_actions => #{}},
|
||||
S1 = revoke_all_streams(S0),
|
||||
{ok, S1, SharedSubS}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% on_subscribe
|
||||
|
||||
-spec on_subscribe(
|
||||
share_topic_filter(),
|
||||
emqx_types:subopts(),
|
||||
emqx_persistent_session_ds:session()
|
||||
) -> {ok, emqx_persistent_session_ds_state:t(), t()} | {error, emqx_types:reason_code()}.
|
||||
on_subscribe(TopicFilter, SubOpts, #{s := S} = Session) ->
|
||||
Subscription = emqx_persistent_session_ds_state:get_subscription(TopicFilter, S),
|
||||
on_subscribe(Subscription, TopicFilter, SubOpts, Session).
|
||||
|
||||
-spec on_unsubscribe(
|
||||
emqx_persistent_session_ds:id(),
|
||||
emqx_persistent_session_ds:topic_filter(),
|
||||
emqx_persistent_session_ds_state:t(),
|
||||
t()
|
||||
) ->
|
||||
{ok, emqx_persistent_session_ds_state:t(), t(), emqx_persistent_session_ds:subscription()}
|
||||
| {error, emqx_types:reason_code()}.
|
||||
on_unsubscribe(SessionId, TopicFilter, S0, #{agent := Agent0} = SharedSubS0) ->
|
||||
case lookup(TopicFilter, S0) of
|
||||
undefined ->
|
||||
{error, ?RC_NO_SUBSCRIPTION_EXISTED};
|
||||
Subscription ->
|
||||
?tp(persistent_session_ds_subscription_delete, #{
|
||||
session_id => SessionId, topic_filter => TopicFilter
|
||||
}),
|
||||
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_unsubscribe(
|
||||
Agent0, TopicFilter
|
||||
),
|
||||
SharedSubS = SharedSubS0#{agent => Agent1},
|
||||
S = emqx_persistent_session_ds_state:del_subscription(TopicFilter, S0),
|
||||
{ok, S, SharedSubS, Subscription}
|
||||
end.
|
||||
|
||||
-spec renew_streams(emqx_persistent_session_ds_state:t(), t()) ->
|
||||
{emqx_persistent_session_ds_state:t(), t()}.
|
||||
renew_streams(S0, #{agent := Agent0} = SharedSubS0) ->
|
||||
{StreamLeaseEvents, Agent1} = emqx_persistent_session_ds_shared_subs_agent:renew_streams(
|
||||
Agent0
|
||||
),
|
||||
?tp(info, shared_subs_new_stream_lease_events, #{stream_lease_events => StreamLeaseEvents}),
|
||||
S1 = lists:foldl(
|
||||
fun
|
||||
(#{type := lease} = Event, S) -> accept_stream(Event, S);
|
||||
(#{type := revoke} = Event, S) -> revoke_stream(Event, S)
|
||||
end,
|
||||
S0,
|
||||
StreamLeaseEvents
|
||||
),
|
||||
SharedSubS1 = SharedSubS0#{agent => Agent1},
|
||||
{S1, SharedSubS1}.
|
||||
|
||||
-spec on_streams_replayed(
|
||||
emqx_persistent_session_ds_state:t(),
|
||||
t()
|
||||
) -> {emqx_persistent_session_ds_state:t(), t()}.
|
||||
on_streams_replayed(S, #{agent := Agent0} = SharedSubS0) ->
|
||||
%% TODO
|
||||
%% Is it sufficient for a report?
|
||||
Progress = fold_shared_stream_states(
|
||||
fun(TopicFilter, Stream, SRS, Acc) ->
|
||||
#srs{it_begin = BeginIt} = SRS,
|
||||
StreamProgress = #{
|
||||
topic_filter => TopicFilter,
|
||||
stream => Stream,
|
||||
iterator => BeginIt
|
||||
},
|
||||
[StreamProgress | Acc]
|
||||
end,
|
||||
[],
|
||||
S
|
||||
),
|
||||
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_stream_progress(
|
||||
Agent0, Progress
|
||||
),
|
||||
SharedSubS1 = SharedSubS0#{agent => Agent1},
|
||||
{S, SharedSubS1}.
|
||||
|
||||
-spec on_info(emqx_persistent_session_ds_state:t(), t(), term()) ->
|
||||
{emqx_persistent_session_ds_state:t(), t()}.
|
||||
on_info(S, #{agent := Agent0} = SharedSubS0, Info) ->
|
||||
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_info(Agent0, Info),
|
||||
SharedSubS1 = SharedSubS0#{agent => Agent1},
|
||||
{S, SharedSubS1}.
|
||||
|
||||
-spec to_map(emqx_persistent_session_ds_state:t(), t()) -> map().
|
||||
to_map(_S, _SharedSubS) ->
|
||||
%% TODO
|
||||
#{}.
|
||||
on_subscribe(#share{} = ShareTopicFilter, SubOpts, #{s := S} = Session) ->
|
||||
Subscription = emqx_persistent_session_ds_state:get_subscription(ShareTopicFilter, S),
|
||||
on_subscribe(Subscription, ShareTopicFilter, SubOpts, Session).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
%%--------------------------------------------------------------------
|
||||
%% on_subscribe internal functions
|
||||
|
||||
fold_shared_subs(Fun, Acc, S) ->
|
||||
emqx_persistent_session_ds_state:fold_subscriptions(
|
||||
fun
|
||||
(#share{} = TopicFilter, Sub, Acc0) -> Fun(TopicFilter, Sub, Acc0);
|
||||
(_, _Sub, Acc0) -> Acc0
|
||||
end,
|
||||
Acc,
|
||||
S
|
||||
).
|
||||
|
||||
fold_shared_stream_states(Fun, Acc, S) ->
|
||||
%% TODO
|
||||
%% Optimize or cache
|
||||
TopicFilters = fold_shared_subs(
|
||||
fun
|
||||
(#share{} = TopicFilter, #{id := Id} = _Sub, Acc0) ->
|
||||
Acc0#{Id => TopicFilter};
|
||||
(_, _, Acc0) ->
|
||||
Acc0
|
||||
end,
|
||||
#{},
|
||||
S
|
||||
),
|
||||
emqx_persistent_session_ds_state:fold_streams(
|
||||
fun({SubId, Stream}, SRS, Acc0) ->
|
||||
case TopicFilters of
|
||||
#{SubId := TopicFilter} ->
|
||||
Fun(TopicFilter, Stream, SRS, Acc0);
|
||||
_ ->
|
||||
Acc0
|
||||
end
|
||||
end,
|
||||
Acc,
|
||||
S
|
||||
).
|
||||
|
||||
on_subscribe(undefined, TopicFilter, SubOpts, #{props := Props, s := S} = Session) ->
|
||||
on_subscribe(undefined, ShareTopicFilter, SubOpts, #{props := Props, s := S} = Session) ->
|
||||
#{max_subscriptions := MaxSubscriptions} = Props,
|
||||
case emqx_persistent_session_ds_state:n_subscriptions(S) < MaxSubscriptions of
|
||||
true ->
|
||||
create_new_subscription(TopicFilter, SubOpts, Session);
|
||||
create_new_subscription(ShareTopicFilter, SubOpts, Session);
|
||||
false ->
|
||||
{error, ?RC_QUOTA_EXCEEDED}
|
||||
end;
|
||||
on_subscribe(Subscription, TopicFilter, SubOpts, Session) ->
|
||||
update_subscription(Subscription, TopicFilter, SubOpts, Session).
|
||||
on_subscribe(Subscription, ShareTopicFilter, SubOpts, Session) ->
|
||||
update_subscription(Subscription, ShareTopicFilter, SubOpts, Session).
|
||||
|
||||
-dialyzer({nowarn_function, create_new_subscription/3}).
|
||||
create_new_subscription(TopicFilter, SubOpts, #{
|
||||
id := SessionId, s := S0, shared_sub_s := #{agent := Agent0} = SharedSubS0, props := Props
|
||||
create_new_subscription(#share{topic = TopicFilter, group = Group} = ShareTopicFilter, SubOpts, #{
|
||||
id := SessionId,
|
||||
s := S0,
|
||||
shared_sub_s := #{agent := Agent} = SharedSubS0,
|
||||
props := Props
|
||||
}) ->
|
||||
case
|
||||
emqx_persistent_session_ds_shared_subs_agent:on_subscribe(
|
||||
Agent0, TopicFilter, SubOpts
|
||||
emqx_persistent_session_ds_shared_subs_agent:can_subscribe(
|
||||
Agent, ShareTopicFilter, SubOpts
|
||||
)
|
||||
of
|
||||
{ok, Agent1} ->
|
||||
ok ->
|
||||
ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, #share_dest{
|
||||
session_id = SessionId, group = Group
|
||||
}),
|
||||
_ = emqx_external_broker:add_persistent_shared_route(TopicFilter, Group, SessionId),
|
||||
#{upgrade_qos := UpgradeQoS} = Props,
|
||||
{SubId, S1} = emqx_persistent_session_ds_state:new_id(S0),
|
||||
{SStateId, S2} = emqx_persistent_session_ds_state:new_id(S1),
|
||||
|
@ -227,20 +199,20 @@ create_new_subscription(TopicFilter, SubOpts, #{
|
|||
start_time => now_ms()
|
||||
},
|
||||
S = emqx_persistent_session_ds_state:put_subscription(
|
||||
TopicFilter, Subscription, S3
|
||||
ShareTopicFilter, Subscription, S3
|
||||
),
|
||||
SharedSubS = SharedSubS0#{agent => Agent1},
|
||||
?tp(persistent_session_ds_shared_subscription_added, #{
|
||||
topic_filter => TopicFilter, session => SessionId
|
||||
}),
|
||||
|
||||
SharedSubS = schedule_subscribe(SharedSubS0, ShareTopicFilter, SubOpts),
|
||||
{ok, S, SharedSubS};
|
||||
{error, _} = Error ->
|
||||
Error
|
||||
end.
|
||||
|
||||
update_subscription(#{current_state := SStateId0, id := SubId} = Sub0, TopicFilter, SubOpts, #{
|
||||
update_subscription(
|
||||
#{current_state := SStateId0, id := SubId} = Sub0, ShareTopicFilter, SubOpts, #{
|
||||
s := S0, shared_sub_s := SharedSubS, props := Props
|
||||
}) ->
|
||||
}
|
||||
) ->
|
||||
#{upgrade_qos := UpgradeQoS} = Props,
|
||||
SState = #{parent_subscription => SubId, upgrade_qos => UpgradeQoS, subopts => SubOpts},
|
||||
case emqx_persistent_session_ds_state:get_subscription_state(SStateId0, S0) of
|
||||
|
@ -254,36 +226,173 @@ update_subscription(#{current_state := SStateId0, id := SubId} = Sub0, TopicFilt
|
|||
SStateId, SState, S1
|
||||
),
|
||||
Sub = Sub0#{current_state => SStateId},
|
||||
S = emqx_persistent_session_ds_state:put_subscription(TopicFilter, Sub, S2),
|
||||
S = emqx_persistent_session_ds_state:put_subscription(ShareTopicFilter, Sub, S2),
|
||||
{ok, S, SharedSubS}
|
||||
end.
|
||||
|
||||
lookup(TopicFilter, S) ->
|
||||
case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S) of
|
||||
Sub = #{current_state := SStateId} ->
|
||||
case emqx_persistent_session_ds_state:get_subscription_state(SStateId, S) of
|
||||
#{subopts := SubOpts} ->
|
||||
Sub#{subopts => SubOpts};
|
||||
-dialyzer({nowarn_function, schedule_subscribe/3}).
|
||||
schedule_subscribe(
|
||||
#{agent := Agent0, scheduled_actions := ScheduledActions0} = SharedSubS0,
|
||||
ShareTopicFilter,
|
||||
SubOpts
|
||||
) ->
|
||||
case ScheduledActions0 of
|
||||
#{ShareTopicFilter := ScheduledAction} ->
|
||||
ScheduledActions1 = ScheduledActions0#{
|
||||
ShareTopicFilter => ScheduledAction#{type => {?schedule_subscribe, SubOpts}}
|
||||
},
|
||||
?tp(debug, shared_subs_schedule_subscribe_override, #{
|
||||
share_topic_filter => ShareTopicFilter,
|
||||
new_type => {?schedule_subscribe, SubOpts},
|
||||
old_action => format_schedule_action(ScheduledAction)
|
||||
}),
|
||||
SharedSubS0#{scheduled_actions := ScheduledActions1};
|
||||
_ ->
|
||||
?tp(debug, shared_subs_schedule_subscribe_new, #{
|
||||
share_topic_filter => ShareTopicFilter, subopts => SubOpts
|
||||
}),
|
||||
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_subscribe(
|
||||
Agent0, ShareTopicFilter, SubOpts
|
||||
),
|
||||
SharedSubS0#{agent => Agent1}
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% on_unsubscribe
|
||||
|
||||
-spec on_unsubscribe(
|
||||
emqx_persistent_session_ds:id(),
|
||||
share_topic_filter(),
|
||||
emqx_persistent_session_ds_state:t(),
|
||||
t()
|
||||
) ->
|
||||
{ok, emqx_persistent_session_ds_state:t(), t(), emqx_persistent_session_ds:subscription()}
|
||||
| {error, emqx_types:reason_code()}.
|
||||
on_unsubscribe(
|
||||
SessionId, #share{topic = TopicFilter, group = Group} = ShareTopicFilter, S0, SharedSubS0
|
||||
) ->
|
||||
case lookup(ShareTopicFilter, S0) of
|
||||
undefined ->
|
||||
undefined
|
||||
end;
|
||||
undefined ->
|
||||
undefined
|
||||
{error, ?RC_NO_SUBSCRIPTION_EXISTED};
|
||||
#{id := SubId} = Subscription ->
|
||||
?tp(persistent_session_ds_subscription_delete, #{
|
||||
session_id => SessionId, share_topic_filter => ShareTopicFilter
|
||||
}),
|
||||
_ = emqx_external_broker:delete_persistent_shared_route(TopicFilter, Group, SessionId),
|
||||
ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, #share_dest{
|
||||
session_id = SessionId, group = Group
|
||||
}),
|
||||
S = emqx_persistent_session_ds_state:del_subscription(ShareTopicFilter, S0),
|
||||
SharedSubS = schedule_unsubscribe(S, SharedSubS0, SubId, ShareTopicFilter),
|
||||
{ok, S, SharedSubS, Subscription}
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% on_unsubscribe internal functions
|
||||
|
||||
schedule_unsubscribe(
|
||||
S, #{scheduled_actions := ScheduledActions0} = SharedSubS0, UnsubscridedSubId, ShareTopicFilter
|
||||
) ->
|
||||
case ScheduledActions0 of
|
||||
#{ShareTopicFilter := ScheduledAction0} ->
|
||||
ScheduledAction1 = ScheduledAction0#{type => ?schedule_unsubscribe},
|
||||
ScheduledActions1 = ScheduledActions0#{
|
||||
ShareTopicFilter => ScheduledAction1
|
||||
},
|
||||
?tp(debug, shared_subs_schedule_unsubscribe_override, #{
|
||||
share_topic_filter => ShareTopicFilter,
|
||||
new_type => ?schedule_unsubscribe,
|
||||
old_action => format_schedule_action(ScheduledAction0)
|
||||
}),
|
||||
SharedSubS0#{scheduled_actions := ScheduledActions1};
|
||||
_ ->
|
||||
StreamKeys = stream_keys_by_sub_id(S, UnsubscridedSubId),
|
||||
ScheduledActions1 = ScheduledActions0#{
|
||||
ShareTopicFilter => #{
|
||||
type => ?schedule_unsubscribe,
|
||||
stream_keys_to_wait => StreamKeys,
|
||||
progresses => []
|
||||
}
|
||||
},
|
||||
?tp(debug, shared_subs_schedule_unsubscribe_new, #{
|
||||
share_topic_filter => ShareTopicFilter,
|
||||
stream_keys => format_stream_keys(StreamKeys)
|
||||
}),
|
||||
SharedSubS0#{scheduled_actions := ScheduledActions1}
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% pre_renew_streams
|
||||
|
||||
-spec pre_renew_streams(emqx_persistent_session_ds_state:t(), t()) ->
|
||||
{emqx_persistent_session_ds_state:t(), t()}.
|
||||
pre_renew_streams(S, SharedSubS) ->
|
||||
on_streams_replay(S, SharedSubS).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% renew_streams
|
||||
|
||||
-spec renew_streams(emqx_persistent_session_ds_state:t(), t()) ->
|
||||
{emqx_persistent_session_ds_state:t(), t()}.
|
||||
renew_streams(S0, #{agent := Agent0, scheduled_actions := ScheduledActions} = SharedSubS0) ->
|
||||
{StreamLeaseEvents, Agent1} = emqx_persistent_session_ds_shared_subs_agent:renew_streams(
|
||||
Agent0
|
||||
),
|
||||
StreamLeaseEvents =/= [] andalso
|
||||
?tp(debug, shared_subs_new_stream_lease_events, #{
|
||||
stream_lease_events => format_lease_events(StreamLeaseEvents)
|
||||
}),
|
||||
S1 = lists:foldl(
|
||||
fun
|
||||
(#{type := lease} = Event, S) -> accept_stream(Event, S, ScheduledActions);
|
||||
(#{type := revoke} = Event, S) -> revoke_stream(Event, S)
|
||||
end,
|
||||
S0,
|
||||
StreamLeaseEvents
|
||||
),
|
||||
SharedSubS1 = SharedSubS0#{agent => Agent1},
|
||||
{S1, SharedSubS1}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% renew_streams internal functions
|
||||
|
||||
accept_stream(#{share_topic_filter := ShareTopicFilter} = Event, S, ScheduledActions) ->
|
||||
%% If we have a pending action (subscribe or unsubscribe) for this topic filter,
|
||||
%% we should not accept a stream and start replaying it. We won't use it anyway:
|
||||
%% * if subscribe is pending, we will reset agent obtain a new lease
|
||||
%% * if unsubscribe is pending, we will drop connection
|
||||
case ScheduledActions of
|
||||
#{ShareTopicFilter := _Action} ->
|
||||
S;
|
||||
_ ->
|
||||
accept_stream(Event, S)
|
||||
end.
|
||||
|
||||
accept_stream(
|
||||
#{topic_filter := TopicFilter, stream := Stream, iterator := Iterator}, S0
|
||||
#{
|
||||
share_topic_filter := ShareTopicFilter,
|
||||
stream := Stream,
|
||||
progress := #{iterator := Iterator} = _Progress
|
||||
} = _Event,
|
||||
S0
|
||||
) ->
|
||||
case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of
|
||||
case emqx_persistent_session_ds_state:get_subscription(ShareTopicFilter, S0) of
|
||||
undefined ->
|
||||
%% This should not happen.
|
||||
%% Agent should have received unsubscribe callback
|
||||
%% and should not have passed this stream as a new one
|
||||
error(new_stream_without_sub);
|
||||
%% We unsubscribed
|
||||
S0;
|
||||
#{id := SubId, current_state := SStateId} ->
|
||||
Key = {SubId, Stream},
|
||||
NeedCreateStream =
|
||||
case emqx_persistent_session_ds_state:get_stream(Key, S0) of
|
||||
undefined ->
|
||||
true;
|
||||
#srs{unsubscribed = true} ->
|
||||
true;
|
||||
_SRS ->
|
||||
false
|
||||
end,
|
||||
case NeedCreateStream of
|
||||
true ->
|
||||
NewSRS =
|
||||
#srs{
|
||||
rank_x = ?rank_x,
|
||||
|
@ -294,15 +403,15 @@ accept_stream(
|
|||
},
|
||||
S1 = emqx_persistent_session_ds_state:put_stream(Key, NewSRS, S0),
|
||||
S1;
|
||||
_SRS ->
|
||||
false ->
|
||||
S0
|
||||
end
|
||||
end.
|
||||
|
||||
revoke_stream(
|
||||
#{topic_filter := TopicFilter, stream := Stream}, S0
|
||||
#{share_topic_filter := ShareTopicFilter, stream := Stream}, S0
|
||||
) ->
|
||||
case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of
|
||||
case emqx_persistent_session_ds_state:get_subscription(ShareTopicFilter, S0) of
|
||||
undefined ->
|
||||
%% This should not happen.
|
||||
%% Agent should have received unsubscribe callback
|
||||
|
@ -320,19 +429,363 @@ revoke_stream(
|
|||
end
|
||||
end.
|
||||
|
||||
-spec to_agent_subscription(
|
||||
emqx_persistent_session_ds_state:t(), emqx_persistent_session_ds:subscription()
|
||||
%%--------------------------------------------------------------------
|
||||
%% on_streams_replay
|
||||
|
||||
-spec on_streams_replay(
|
||||
emqx_persistent_session_ds_state:t(),
|
||||
t()
|
||||
) -> {emqx_persistent_session_ds_state:t(), t()}.
|
||||
on_streams_replay(S0, SharedSubS0) ->
|
||||
{S1, #{agent := Agent0, scheduled_actions := ScheduledActions0} = SharedSubS1} =
|
||||
renew_streams(S0, SharedSubS0),
|
||||
|
||||
Progresses = all_stream_progresses(S1, Agent0),
|
||||
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_stream_progress(
|
||||
Agent0, Progresses
|
||||
),
|
||||
{Agent2, ScheduledActions1} = run_scheduled_actions(S1, Agent1, ScheduledActions0),
|
||||
SharedSubS2 = SharedSubS1#{
|
||||
agent => Agent2,
|
||||
scheduled_actions => ScheduledActions1
|
||||
},
|
||||
{S1, SharedSubS2}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% on_streams_replay internal functions
|
||||
|
||||
all_stream_progresses(S, Agent) ->
|
||||
all_stream_progresses(S, Agent, _NeedUnacked = false).
|
||||
|
||||
all_stream_progresses(S, _Agent, NeedUnacked) ->
|
||||
CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S),
|
||||
CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S),
|
||||
fold_shared_stream_states(
|
||||
fun(ShareTopicFilter, Stream, SRS, ProgressesAcc0) ->
|
||||
case
|
||||
is_stream_started(CommQos1, CommQos2, SRS) and
|
||||
(NeedUnacked or is_stream_fully_acked(CommQos1, CommQos2, SRS))
|
||||
of
|
||||
true ->
|
||||
StreamProgress = stream_progress(CommQos1, CommQos2, Stream, SRS),
|
||||
maps:update_with(
|
||||
ShareTopicFilter,
|
||||
fun(Progresses) -> [StreamProgress | Progresses] end,
|
||||
[StreamProgress],
|
||||
ProgressesAcc0
|
||||
);
|
||||
false ->
|
||||
ProgressesAcc0
|
||||
end
|
||||
end,
|
||||
#{},
|
||||
S
|
||||
).
|
||||
|
||||
run_scheduled_actions(S, Agent, ScheduledActions) ->
|
||||
maps:fold(
|
||||
fun(ShareTopicFilter, Action0, {AgentAcc0, ScheduledActionsAcc}) ->
|
||||
case run_scheduled_action(S, AgentAcc0, ShareTopicFilter, Action0) of
|
||||
{ok, AgentAcc1} ->
|
||||
{AgentAcc1, maps:remove(ShareTopicFilter, ScheduledActionsAcc)};
|
||||
{continue, Action1} ->
|
||||
{AgentAcc0, ScheduledActionsAcc#{ShareTopicFilter => Action1}}
|
||||
end
|
||||
end,
|
||||
{Agent, ScheduledActions},
|
||||
ScheduledActions
|
||||
).
|
||||
|
||||
run_scheduled_action(
|
||||
S,
|
||||
Agent0,
|
||||
ShareTopicFilter,
|
||||
#{type := Type, stream_keys_to_wait := StreamKeysToWait0, progresses := Progresses0} = Action
|
||||
) ->
|
||||
emqx_persistent_session_ds_shared_subs_agent:subscription().
|
||||
to_agent_subscription(_S, Subscription) ->
|
||||
StreamKeysToWait1 = filter_unfinished_streams(S, StreamKeysToWait0),
|
||||
Progresses1 = stream_progresses(S, StreamKeysToWait0 -- StreamKeysToWait1) ++ Progresses0,
|
||||
case StreamKeysToWait1 of
|
||||
[] ->
|
||||
?tp(debug, shared_subs_schedule_action_complete, #{
|
||||
share_topic_filter => ShareTopicFilter,
|
||||
progresses => format_stream_progresses(Progresses1),
|
||||
type => Type
|
||||
}),
|
||||
%% Regular progress won't se unsubscribed streams, so we need to
|
||||
%% send the progress explicitly.
|
||||
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_stream_progress(
|
||||
Agent0, #{ShareTopicFilter => Progresses1}
|
||||
),
|
||||
case Type of
|
||||
{?schedule_subscribe, SubOpts} ->
|
||||
{ok,
|
||||
emqx_persistent_session_ds_shared_subs_agent:on_subscribe(
|
||||
Agent1, ShareTopicFilter, SubOpts
|
||||
)};
|
||||
?schedule_unsubscribe ->
|
||||
{ok,
|
||||
emqx_persistent_session_ds_shared_subs_agent:on_unsubscribe(
|
||||
Agent1, ShareTopicFilter, Progresses1
|
||||
)}
|
||||
end;
|
||||
_ ->
|
||||
Action1 = Action#{stream_keys_to_wait => StreamKeysToWait1, progresses => Progresses1},
|
||||
?tp(debug, shared_subs_schedule_action_continue, #{
|
||||
share_topic_filter => ShareTopicFilter,
|
||||
new_action => format_schedule_action(Action1)
|
||||
}),
|
||||
{continue, Action1}
|
||||
end.
|
||||
|
||||
filter_unfinished_streams(S, StreamKeysToWait) ->
|
||||
CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S),
|
||||
CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S),
|
||||
lists:filter(
|
||||
fun(Key) ->
|
||||
case emqx_persistent_session_ds_state:get_stream(Key, S) of
|
||||
undefined ->
|
||||
%% This should not happen: we should see any stream
|
||||
%% in completed state before deletion
|
||||
true;
|
||||
SRS ->
|
||||
not is_stream_fully_acked(CommQos1, CommQos2, SRS)
|
||||
end
|
||||
end,
|
||||
StreamKeysToWait
|
||||
).
|
||||
|
||||
stream_progresses(S, StreamKeys) ->
|
||||
CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S),
|
||||
CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S),
|
||||
lists:map(
|
||||
fun({_SubId, Stream} = Key) ->
|
||||
SRS = emqx_persistent_session_ds_state:get_stream(Key, S),
|
||||
stream_progress(CommQos1, CommQos2, Stream, SRS)
|
||||
end,
|
||||
StreamKeys
|
||||
).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% on_disconnect
|
||||
|
||||
on_disconnect(S0, #{agent := Agent0} = SharedSubS0) ->
|
||||
S1 = revoke_all_streams(S0),
|
||||
Progresses = all_stream_progresses(S1, Agent0, _NeedUnacked = true),
|
||||
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_disconnect(Agent0, Progresses),
|
||||
SharedSubS1 = SharedSubS0#{agent => Agent1, scheduled_actions => #{}},
|
||||
{S1, SharedSubS1}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% on_disconnect helpers
|
||||
|
||||
revoke_all_streams(S0) ->
|
||||
fold_shared_stream_states(
|
||||
fun(ShareTopicFilter, Stream, _SRS, S) ->
|
||||
revoke_stream(#{share_topic_filter => ShareTopicFilter, stream => Stream}, S)
|
||||
end,
|
||||
S0,
|
||||
S0
|
||||
).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% on_info
|
||||
|
||||
-spec on_info(emqx_persistent_session_ds_state:t(), t(), term()) ->
|
||||
{emqx_persistent_session_ds_state:t(), t()}.
|
||||
on_info(S, #{agent := Agent0} = SharedSubS0, Info) ->
|
||||
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_info(Agent0, Info),
|
||||
SharedSubS1 = SharedSubS0#{agent => Agent1},
|
||||
{S, SharedSubS1}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% to_map
|
||||
|
||||
-spec to_map(emqx_persistent_session_ds_state:t(), t()) -> map().
|
||||
to_map(S, _SharedSubS) ->
|
||||
fold_shared_subs(
|
||||
fun(ShareTopicFilter, _, Acc) -> Acc#{ShareTopicFilter => lookup(ShareTopicFilter, S)} end,
|
||||
#{},
|
||||
S
|
||||
).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% cold_get_subscription
|
||||
|
||||
-spec cold_get_subscription(emqx_persistent_session_ds:id(), share_topic_filter()) ->
|
||||
emqx_persistent_session_ds:subscription() | undefined.
|
||||
cold_get_subscription(SessionId, ShareTopicFilter) ->
|
||||
case emqx_persistent_session_ds_state:cold_get_subscription(SessionId, ShareTopicFilter) of
|
||||
[Sub = #{current_state := SStateId}] ->
|
||||
case
|
||||
emqx_persistent_session_ds_state:cold_get_subscription_state(SessionId, SStateId)
|
||||
of
|
||||
[#{subopts := Subopts}] ->
|
||||
Sub#{subopts => Subopts};
|
||||
_ ->
|
||||
undefined
|
||||
end;
|
||||
_ ->
|
||||
undefined
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Generic helpers
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
lookup(ShareTopicFilter, S) ->
|
||||
case emqx_persistent_session_ds_state:get_subscription(ShareTopicFilter, S) of
|
||||
Sub = #{current_state := SStateId} ->
|
||||
case emqx_persistent_session_ds_state:get_subscription_state(SStateId, S) of
|
||||
#{subopts := SubOpts} ->
|
||||
Sub#{subopts => SubOpts};
|
||||
undefined ->
|
||||
undefined
|
||||
end;
|
||||
undefined ->
|
||||
undefined
|
||||
end.
|
||||
|
||||
stream_keys_by_sub_id(S, MatchSubId) ->
|
||||
emqx_persistent_session_ds_state:fold_streams(
|
||||
fun({SubId, _Stream} = StreamKey, _SRS, StreamKeys) ->
|
||||
case SubId of
|
||||
MatchSubId ->
|
||||
[StreamKey | StreamKeys];
|
||||
_ ->
|
||||
StreamKeys
|
||||
end
|
||||
end,
|
||||
[],
|
||||
S
|
||||
).
|
||||
|
||||
stream_progress(
|
||||
CommQos1,
|
||||
CommQos2,
|
||||
Stream,
|
||||
#srs{
|
||||
it_end = EndIt,
|
||||
it_begin = BeginIt
|
||||
} = SRS
|
||||
) ->
|
||||
Iterator =
|
||||
case is_stream_fully_acked(CommQos1, CommQos2, SRS) of
|
||||
true -> EndIt;
|
||||
false -> BeginIt
|
||||
end,
|
||||
#{
|
||||
stream => Stream,
|
||||
progress => #{
|
||||
iterator => Iterator
|
||||
},
|
||||
use_finished => is_use_finished(SRS)
|
||||
}.
|
||||
|
||||
fold_shared_subs(Fun, Acc, S) ->
|
||||
emqx_persistent_session_ds_state:fold_subscriptions(
|
||||
fun
|
||||
(#share{} = ShareTopicFilter, Sub, Acc0) -> Fun(ShareTopicFilter, Sub, Acc0);
|
||||
(_, _Sub, Acc0) -> Acc0
|
||||
end,
|
||||
Acc,
|
||||
S
|
||||
).
|
||||
|
||||
fold_shared_stream_states(Fun, Acc, S) ->
|
||||
%% TODO
|
||||
%% do we need anything from sub state?
|
||||
%% Optimize or cache
|
||||
ShareTopicFilters = fold_shared_subs(
|
||||
fun
|
||||
(#share{} = ShareTopicFilter, #{id := Id} = _Sub, Acc0) ->
|
||||
Acc0#{Id => ShareTopicFilter};
|
||||
(_, _, Acc0) ->
|
||||
Acc0
|
||||
end,
|
||||
#{},
|
||||
S
|
||||
),
|
||||
emqx_persistent_session_ds_state:fold_streams(
|
||||
fun({SubId, Stream}, SRS, Acc0) ->
|
||||
case ShareTopicFilters of
|
||||
#{SubId := ShareTopicFilter} ->
|
||||
Fun(ShareTopicFilter, Stream, SRS, Acc0);
|
||||
_ ->
|
||||
Acc0
|
||||
end
|
||||
end,
|
||||
Acc,
|
||||
S
|
||||
).
|
||||
|
||||
to_agent_subscription(_S, Subscription) ->
|
||||
maps:with([start_time], Subscription).
|
||||
|
||||
-spec agent_opts(opts()) -> emqx_persistent_session_ds_shared_subs_agent:opts().
|
||||
agent_opts(#{session_id := SessionId}) ->
|
||||
#{session_id => SessionId}.
|
||||
|
||||
-dialyzer({nowarn_function, now_ms/0}).
|
||||
now_ms() ->
|
||||
erlang:system_time(millisecond).
|
||||
|
||||
is_use_finished(#srs{unsubscribed = Unsubscribed}) ->
|
||||
Unsubscribed.
|
||||
|
||||
is_stream_started(CommQos1, CommQos2, #srs{first_seqno_qos1 = Q1, last_seqno_qos1 = Q2}) ->
|
||||
(CommQos1 >= Q1) or (CommQos2 >= Q2).
|
||||
|
||||
is_stream_fully_acked(_, _, #srs{
|
||||
first_seqno_qos1 = Q1, last_seqno_qos1 = Q1, first_seqno_qos2 = Q2, last_seqno_qos2 = Q2
|
||||
}) ->
|
||||
%% Streams where the last chunk doesn't contain any QoS1 and 2
|
||||
%% messages are considered fully acked:
|
||||
true;
|
||||
is_stream_fully_acked(Comm1, Comm2, #srs{last_seqno_qos1 = S1, last_seqno_qos2 = S2}) ->
|
||||
(Comm1 >= S1) andalso (Comm2 >= S2).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Formatters
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
format_schedule_action(#{
|
||||
type := Type, progresses := Progresses, stream_keys_to_wait := StreamKeysToWait
|
||||
}) ->
|
||||
#{
|
||||
type => Type,
|
||||
progresses => format_stream_progresses(Progresses),
|
||||
stream_keys_to_wait => format_stream_keys(StreamKeysToWait)
|
||||
}.
|
||||
|
||||
format_stream_progresses(Streams) ->
|
||||
lists:map(
|
||||
fun format_stream_progress/1,
|
||||
Streams
|
||||
).
|
||||
|
||||
format_stream_progress(#{stream := Stream, progress := Progress} = Value) ->
|
||||
Value#{stream => format_opaque(Stream), progress => format_progress(Progress)}.
|
||||
|
||||
format_progress(#{iterator := Iterator} = Progress) ->
|
||||
Progress#{iterator => format_opaque(Iterator)}.
|
||||
|
||||
format_stream_key(beginning) -> beginning;
|
||||
format_stream_key({SubId, Stream}) -> {SubId, format_opaque(Stream)}.
|
||||
|
||||
format_stream_keys(StreamKeys) ->
|
||||
lists:map(
|
||||
fun format_stream_key/1,
|
||||
StreamKeys
|
||||
).
|
||||
|
||||
format_lease_events(Events) ->
|
||||
lists:map(
|
||||
fun format_lease_event/1,
|
||||
Events
|
||||
).
|
||||
|
||||
format_lease_event(#{stream := Stream, progress := Progress} = Event) ->
|
||||
Event#{stream => format_opaque(Stream), progress => format_progress(Progress)};
|
||||
format_lease_event(#{stream := Stream} = Event) ->
|
||||
Event#{stream => format_opaque(Stream)}.
|
||||
|
||||
format_opaque(Opaque) ->
|
||||
erlang:phash2(Opaque).
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
}.
|
||||
|
||||
-type t() :: term().
|
||||
-type topic_filter() :: emqx_persistent_session_ds:share_topic_filter().
|
||||
-type share_topic_filter() :: emqx_persistent_session_ds:share_topic_filter().
|
||||
|
||||
-type opts() :: #{
|
||||
session_id := session_id()
|
||||
|
@ -28,41 +28,44 @@
|
|||
-type stream_lease() :: #{
|
||||
type => lease,
|
||||
%% Used as "external" subscription_id
|
||||
topic_filter := topic_filter(),
|
||||
share_topic_filter := share_topic_filter(),
|
||||
stream := emqx_ds:stream(),
|
||||
iterator := emqx_ds:iterator()
|
||||
}.
|
||||
|
||||
-type stream_revoke() :: #{
|
||||
type => revoke,
|
||||
topic_filter := topic_filter(),
|
||||
share_topic_filter := share_topic_filter(),
|
||||
stream := emqx_ds:stream()
|
||||
}.
|
||||
|
||||
-type stream_lease_event() :: stream_lease() | stream_revoke().
|
||||
|
||||
-type stream_progress() :: #{
|
||||
topic_filter := topic_filter(),
|
||||
share_topic_filter := share_topic_filter(),
|
||||
stream := emqx_ds:stream(),
|
||||
iterator := emqx_ds:iterator()
|
||||
iterator := emqx_ds:iterator(),
|
||||
use_finished := boolean()
|
||||
}.
|
||||
|
||||
-export_type([
|
||||
t/0,
|
||||
subscription/0,
|
||||
session_id/0,
|
||||
stream_lease/0,
|
||||
stream_lease_event/0,
|
||||
opts/0
|
||||
]).
|
||||
|
||||
-export([
|
||||
new/1,
|
||||
open/2,
|
||||
can_subscribe/3,
|
||||
|
||||
on_subscribe/3,
|
||||
on_unsubscribe/2,
|
||||
on_unsubscribe/3,
|
||||
on_stream_progress/2,
|
||||
on_info/2,
|
||||
on_disconnect/2,
|
||||
|
||||
renew_streams/1
|
||||
]).
|
||||
|
@ -77,12 +80,13 @@
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
-callback new(opts()) -> t().
|
||||
-callback open([{topic_filter(), subscription()}], opts()) -> t().
|
||||
-callback on_subscribe(t(), topic_filter(), emqx_types:subopts()) ->
|
||||
{ok, t()} | {error, term()}.
|
||||
-callback on_unsubscribe(t(), topic_filter()) -> t().
|
||||
-callback open([{share_topic_filter(), subscription()}], opts()) -> t().
|
||||
-callback can_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> ok | {error, term()}.
|
||||
-callback on_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> t().
|
||||
-callback on_unsubscribe(t(), share_topic_filter(), [stream_progress()]) -> t().
|
||||
-callback on_disconnect(t(), [stream_progress()]) -> t().
|
||||
-callback renew_streams(t()) -> {[stream_lease_event()], t()}.
|
||||
-callback on_stream_progress(t(), [stream_progress()]) -> t().
|
||||
-callback on_stream_progress(t(), #{share_topic_filter() => [stream_progress()]}) -> t().
|
||||
-callback on_info(t(), term()) -> t().
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -93,24 +97,31 @@
|
|||
new(Opts) ->
|
||||
?shared_subs_agent:new(Opts).
|
||||
|
||||
-spec open([{topic_filter(), subscription()}], opts()) -> t().
|
||||
-spec open([{share_topic_filter(), subscription()}], opts()) -> t().
|
||||
open(Topics, Opts) ->
|
||||
?shared_subs_agent:open(Topics, Opts).
|
||||
|
||||
-spec on_subscribe(t(), topic_filter(), emqx_types:subopts()) ->
|
||||
{ok, t()} | {error, emqx_types:reason_code()}.
|
||||
on_subscribe(Agent, TopicFilter, SubOpts) ->
|
||||
?shared_subs_agent:on_subscribe(Agent, TopicFilter, SubOpts).
|
||||
-spec can_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> ok | {error, term()}.
|
||||
can_subscribe(Agent, ShareTopicFilter, SubOpts) ->
|
||||
?shared_subs_agent:can_subscribe(Agent, ShareTopicFilter, SubOpts).
|
||||
|
||||
-spec on_unsubscribe(t(), topic_filter()) -> t().
|
||||
on_unsubscribe(Agent, TopicFilter) ->
|
||||
?shared_subs_agent:on_unsubscribe(Agent, TopicFilter).
|
||||
-spec on_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> t().
|
||||
on_subscribe(Agent, ShareTopicFilter, SubOpts) ->
|
||||
?shared_subs_agent:on_subscribe(Agent, ShareTopicFilter, SubOpts).
|
||||
|
||||
-spec on_unsubscribe(t(), share_topic_filter(), [stream_progress()]) -> t().
|
||||
on_unsubscribe(Agent, ShareTopicFilter, StreamProgresses) ->
|
||||
?shared_subs_agent:on_unsubscribe(Agent, ShareTopicFilter, StreamProgresses).
|
||||
|
||||
-spec on_disconnect(t(), #{share_topic_filter() => [stream_progress()]}) -> t().
|
||||
on_disconnect(Agent, StreamProgresses) ->
|
||||
?shared_subs_agent:on_disconnect(Agent, StreamProgresses).
|
||||
|
||||
-spec renew_streams(t()) -> {[stream_lease_event()], t()}.
|
||||
renew_streams(Agent) ->
|
||||
?shared_subs_agent:renew_streams(Agent).
|
||||
|
||||
-spec on_stream_progress(t(), [stream_progress()]) -> t().
|
||||
-spec on_stream_progress(t(), #{share_topic_filter() => [stream_progress()]}) -> t().
|
||||
on_stream_progress(Agent, StreamProgress) ->
|
||||
?shared_subs_agent:on_stream_progress(Agent, StreamProgress).
|
||||
|
||||
|
|
|
@ -9,11 +9,13 @@
|
|||
-export([
|
||||
new/1,
|
||||
open/2,
|
||||
can_subscribe/3,
|
||||
|
||||
on_subscribe/3,
|
||||
on_unsubscribe/2,
|
||||
on_unsubscribe/3,
|
||||
on_stream_progress/2,
|
||||
on_info/2,
|
||||
on_disconnect/2,
|
||||
|
||||
renew_streams/1
|
||||
]).
|
||||
|
@ -30,10 +32,16 @@ new(_Opts) ->
|
|||
open(_Topics, _Opts) ->
|
||||
undefined.
|
||||
|
||||
on_subscribe(_Agent, _TopicFilter, _SubOpts) ->
|
||||
can_subscribe(_Agent, _TopicFilter, _SubOpts) ->
|
||||
{error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}.
|
||||
|
||||
on_unsubscribe(Agent, _TopicFilter) ->
|
||||
on_subscribe(Agent, _TopicFilter, _SubOpts) ->
|
||||
Agent.
|
||||
|
||||
on_unsubscribe(Agent, _TopicFilter, _Progresses) ->
|
||||
Agent.
|
||||
|
||||
on_disconnect(Agent, _) ->
|
||||
Agent.
|
||||
|
||||
renew_streams(Agent) ->
|
||||
|
|
|
@ -399,7 +399,9 @@ new_id(Rec) ->
|
|||
get_subscription(TopicFilter, Rec) ->
|
||||
gen_get(?subscriptions, TopicFilter, Rec).
|
||||
|
||||
-spec cold_get_subscription(emqx_persistent_session_ds:id(), emqx_types:topic()) ->
|
||||
-spec cold_get_subscription(
|
||||
emqx_persistent_session_ds:id(), emqx_types:topic() | emqx_types:share()
|
||||
) ->
|
||||
[emqx_persistent_session_ds_subs:subscription()].
|
||||
cold_get_subscription(SessionId, Topic) ->
|
||||
kv_pmap_read(?subscription_tab, SessionId, Topic).
|
||||
|
|
|
@ -21,7 +21,7 @@
|
|||
|
||||
-record(ps_route, {
|
||||
topic :: binary(),
|
||||
dest :: emqx_persistent_session_ds:id() | '_'
|
||||
dest :: emqx_persistent_session_ds_router:dest() | '_'
|
||||
}).
|
||||
|
||||
-record(ps_routeidx, {
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
%% Till full implementation we need to dispach to the null agent.
|
||||
%% It will report "not implemented" error for attempts to use shared subscriptions.
|
||||
-define(shared_subs_agent, emqx_persistent_session_ds_shared_subs_null_agent).
|
||||
% -define(shared_subs_agent, emqx_ds_shared_sub_agent).
|
||||
|
||||
%% end of -ifdef(TEST).
|
||||
-endif.
|
||||
|
|
|
@ -62,7 +62,7 @@
|
|||
streams := [{pid(), quicer:stream_handle()}],
|
||||
%% New stream opts
|
||||
stream_opts := map(),
|
||||
%% If conneciton is resumed from session ticket
|
||||
%% If connection is resumed from session ticket
|
||||
is_resumed => boolean(),
|
||||
%% mqtt message serializer config
|
||||
serialize => undefined,
|
||||
|
@ -70,8 +70,8 @@
|
|||
}.
|
||||
-type cb_ret() :: quicer_lib:cb_ret().
|
||||
|
||||
%% @doc Data streams initializions are started in parallel with control streams, data streams are blocked
|
||||
%% for the activation from control stream after it is accepted as a legit conneciton.
|
||||
%% @doc Data streams initializations are started in parallel with control streams, data streams are blocked
|
||||
%% for the activation from control stream after it is accepted as a legit connection.
|
||||
%% For security, the initial number of allowed data streams from client should be limited by
|
||||
%% 'peer_bidi_stream_count` & 'peer_unidi_stream_count`
|
||||
-spec activate_data_streams(pid(), {
|
||||
|
@ -80,7 +80,7 @@
|
|||
activate_data_streams(ConnOwner, {PS, Serialize, Channel}) ->
|
||||
gen_server:call(ConnOwner, {activate_data_streams, {PS, Serialize, Channel}}, infinity).
|
||||
|
||||
%% @doc conneciton owner init callback
|
||||
%% @doc connection owner init callback
|
||||
-spec init(map()) -> {ok, cb_state()}.
|
||||
init(#{stream_opts := SOpts} = S) when is_list(SOpts) ->
|
||||
init(S#{stream_opts := maps:from_list(SOpts)});
|
||||
|
|
|
@ -351,6 +351,7 @@ fields("authz_cache") ->
|
|||
#{
|
||||
default => true,
|
||||
required => true,
|
||||
importance => ?IMPORTANCE_NO_DOC,
|
||||
desc => ?DESC(fields_cache_enable)
|
||||
}
|
||||
)},
|
||||
|
@ -387,6 +388,7 @@ fields("flapping_detect") ->
|
|||
boolean(),
|
||||
#{
|
||||
default => false,
|
||||
%% importance => ?IMPORTANCE_NO_DOC,
|
||||
desc => ?DESC(flapping_detect_enable)
|
||||
}
|
||||
)},
|
||||
|
@ -423,6 +425,7 @@ fields("force_shutdown") ->
|
|||
boolean(),
|
||||
#{
|
||||
default => true,
|
||||
importance => ?IMPORTANCE_NO_DOC,
|
||||
desc => ?DESC(force_shutdown_enable)
|
||||
}
|
||||
)},
|
||||
|
@ -452,6 +455,7 @@ fields("overload_protection") ->
|
|||
boolean(),
|
||||
#{
|
||||
desc => ?DESC(overload_protection_enable),
|
||||
%% importance => ?IMPORTANCE_NO_DOC,
|
||||
default => false
|
||||
}
|
||||
)},
|
||||
|
@ -512,7 +516,11 @@ fields("force_gc") ->
|
|||
{"enable",
|
||||
sc(
|
||||
boolean(),
|
||||
#{default => true, desc => ?DESC(force_gc_enable)}
|
||||
#{
|
||||
default => true,
|
||||
importance => ?IMPORTANCE_NO_DOC,
|
||||
desc => ?DESC(force_gc_enable)
|
||||
}
|
||||
)},
|
||||
{"count",
|
||||
sc(
|
||||
|
@ -1665,6 +1673,7 @@ fields("durable_sessions") ->
|
|||
sc(
|
||||
boolean(), #{
|
||||
desc => ?DESC(durable_sessions_enable),
|
||||
%% importance => ?IMPORTANCE_NO_DOC,
|
||||
default => false
|
||||
}
|
||||
)},
|
||||
|
@ -1888,6 +1897,7 @@ base_listener(Bind) ->
|
|||
#{
|
||||
default => true,
|
||||
aliases => [enabled],
|
||||
importance => ?IMPORTANCE_NO_DOC,
|
||||
desc => ?DESC(fields_listener_enabled)
|
||||
}
|
||||
)},
|
||||
|
@ -2416,6 +2426,7 @@ client_ssl_opts_schema(Defaults) ->
|
|||
boolean(),
|
||||
#{
|
||||
default => false,
|
||||
%% importance => ?IMPORTANCE_NO_DOC,
|
||||
desc => ?DESC(client_ssl_opts_schema_enable)
|
||||
}
|
||||
)},
|
||||
|
|
|
@ -589,6 +589,14 @@ ensure_valid_options(Options, Versions) ->
|
|||
|
||||
ensure_valid_options([], _, Acc) ->
|
||||
lists:reverse(Acc);
|
||||
ensure_valid_options([{K, undefined} | T], Versions, Acc) when
|
||||
K =:= crl_check;
|
||||
K =:= crl_cache
|
||||
->
|
||||
%% Note: we must set crl options to `undefined' to unset them. Otherwise,
|
||||
%% `esockd' will retain such options when `esockd:merge_opts/2' is called and the SSL
|
||||
%% options were previously enabled.
|
||||
ensure_valid_options(T, Versions, [{K, undefined} | Acc]);
|
||||
ensure_valid_options([{_, undefined} | T], Versions, Acc) ->
|
||||
ensure_valid_options(T, Versions, Acc);
|
||||
ensure_valid_options([{_, ""} | T], Versions, Acc) ->
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
-include("emqx_mqtt.hrl").
|
||||
|
||||
-export([format/2]).
|
||||
-export([format_meta_map/1]).
|
||||
|
||||
%% logger_formatter:config/0 is not exported.
|
||||
-type config() :: map().
|
||||
|
@ -43,10 +42,6 @@ format(
|
|||
format(Event, Config) ->
|
||||
emqx_logger_textfmt:format(Event, Config).
|
||||
|
||||
format_meta_map(Meta) ->
|
||||
Encode = emqx_trace_handler:payload_encode(),
|
||||
format_meta_map(Meta, Encode).
|
||||
|
||||
format_meta_map(Meta, Encode) ->
|
||||
format_meta_map(Meta, Encode, [
|
||||
{packet, fun format_packet/2},
|
||||
|
|
|
@ -436,6 +436,7 @@ websocket_handle({Frame, _}, State) ->
|
|||
%% TODO: should not close the ws connection
|
||||
?LOG(error, #{msg => "unexpected_frame", frame => Frame}),
|
||||
shutdown(unexpected_ws_frame, State).
|
||||
|
||||
websocket_info({call, From, Req}, State) ->
|
||||
handle_call(From, Req, State);
|
||||
websocket_info({cast, rate_limit}, State) ->
|
||||
|
@ -737,7 +738,8 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
|
|||
input_bytes => Data
|
||||
}),
|
||||
FrameError = {frame_error, Reason},
|
||||
{[{incoming, FrameError} | Packets], State};
|
||||
NState = enrich_state(Reason, State),
|
||||
{[{incoming, FrameError} | Packets], NState};
|
||||
error:Reason:Stacktrace ->
|
||||
?LOG(error, #{
|
||||
at_state => emqx_frame:describe_state(ParseState),
|
||||
|
@ -830,7 +832,7 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
|
|||
?LOG(warning, #{
|
||||
msg => "packet_discarded",
|
||||
reason => "frame_too_large",
|
||||
packet => emqx_packet:format(Packet)
|
||||
packet => Packet
|
||||
}),
|
||||
ok = emqx_metrics:inc('delivery.dropped.too_large'),
|
||||
ok = emqx_metrics:inc('delivery.dropped'),
|
||||
|
@ -1069,6 +1071,13 @@ check_max_connection(Type, Listener) ->
|
|||
{denny, Reason}
|
||||
end
|
||||
end.
|
||||
|
||||
enrich_state(#{parse_state := NParseState}, State) ->
|
||||
Serialize = emqx_frame:serialize_opts(NParseState),
|
||||
State#state{parse_state = NParseState, serialize = Serialize};
|
||||
enrich_state(_, State) ->
|
||||
State.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% For CT tests
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -414,11 +414,18 @@ t_handle_in_auth(_) ->
|
|||
emqx_channel:handle_in(?AUTH_PACKET(), Channel).
|
||||
|
||||
t_handle_in_frame_error(_) ->
|
||||
IdleChannel = channel(#{conn_state => idle}),
|
||||
{shutdown, #{shutdown_count := frame_too_large, cause := frame_too_large}, _Chan} =
|
||||
emqx_channel:handle_in({frame_error, #{cause => frame_too_large}}, IdleChannel),
|
||||
IdleChannelV5 = channel(#{conn_state => idle}),
|
||||
%% no CONNACK packet for v4
|
||||
?assertMatch(
|
||||
{shutdown, #{shutdown_count := frame_too_large, cause := frame_too_large}, _Chan},
|
||||
emqx_channel:handle_in(
|
||||
{frame_error, #{cause => frame_too_large}}, v4(IdleChannelV5)
|
||||
)
|
||||
),
|
||||
|
||||
ConnectingChan = channel(#{conn_state => connecting}),
|
||||
ConnackPacket = ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE),
|
||||
?assertMatch(
|
||||
{shutdown,
|
||||
#{
|
||||
shutdown_count := frame_too_large,
|
||||
|
@ -426,12 +433,13 @@ t_handle_in_frame_error(_) ->
|
|||
limit := 100,
|
||||
received := 101
|
||||
},
|
||||
ConnackPacket,
|
||||
_} =
|
||||
ConnackPacket, _},
|
||||
emqx_channel:handle_in(
|
||||
{frame_error, #{cause => frame_too_large, received => 101, limit => 100}},
|
||||
ConnectingChan
|
||||
)
|
||||
),
|
||||
|
||||
DisconnectPacket = ?DISCONNECT_PACKET(?RC_PACKET_TOO_LARGE),
|
||||
ConnectedChan = channel(#{conn_state => connected}),
|
||||
?assertMatch(
|
||||
|
|
|
@ -78,6 +78,7 @@
|
|||
start_epmd/0,
|
||||
start_peer/2,
|
||||
stop_peer/1,
|
||||
ebin_path/0,
|
||||
listener_port/2
|
||||
]).
|
||||
|
||||
|
|
|
@ -138,13 +138,14 @@ init_per_testcase(t_refresh_config = TestCase, Config) ->
|
|||
];
|
||||
init_per_testcase(TestCase, Config) when
|
||||
TestCase =:= t_update_listener;
|
||||
TestCase =:= t_update_listener_enable_disable;
|
||||
TestCase =:= t_validations
|
||||
->
|
||||
ct:timetrap({seconds, 30}),
|
||||
ok = snabbkaffe:start_trace(),
|
||||
%% when running emqx standalone tests, we can't use those
|
||||
%% features.
|
||||
case does_module_exist(emqx_management) of
|
||||
case does_module_exist(emqx_mgmt) of
|
||||
true ->
|
||||
DataDir = ?config(data_dir, Config),
|
||||
CRLFile = filename:join([DataDir, "intermediate-revoked.crl.pem"]),
|
||||
|
@ -165,7 +166,7 @@ init_per_testcase(TestCase, Config) when
|
|||
{emqx_conf, #{config => #{listeners => #{ssl => #{default => ListenerConf}}}}},
|
||||
emqx,
|
||||
emqx_management,
|
||||
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
|
||||
emqx_mgmt_api_test_util:emqx_dashboard()
|
||||
],
|
||||
#{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}
|
||||
),
|
||||
|
@ -206,6 +207,7 @@ read_crl(Filename) ->
|
|||
|
||||
end_per_testcase(TestCase, Config) when
|
||||
TestCase =:= t_update_listener;
|
||||
TestCase =:= t_update_listener_enable_disable;
|
||||
TestCase =:= t_validations
|
||||
->
|
||||
Skip = proplists:get_bool(skip_does_not_apply, Config),
|
||||
|
@ -1057,3 +1059,104 @@ do_t_validations(_Config) ->
|
|||
),
|
||||
|
||||
ok.
|
||||
|
||||
%% Checks that if CRL is ever enabled and then disabled, clients can connect, even if they
|
||||
%% would otherwise not have their corresponding CRLs cached and fail with `{bad_crls,
|
||||
%% no_relevant_crls}`.
|
||||
t_update_listener_enable_disable(Config) ->
|
||||
case proplists:get_bool(skip_does_not_apply, Config) of
|
||||
true ->
|
||||
ct:pal("skipping as this test does not apply in this profile"),
|
||||
ok;
|
||||
false ->
|
||||
do_t_update_listener_enable_disable(Config)
|
||||
end.
|
||||
|
||||
do_t_update_listener_enable_disable(Config) ->
|
||||
DataDir = ?config(data_dir, Config),
|
||||
Keyfile = filename:join([DataDir, "server.key.pem"]),
|
||||
Certfile = filename:join([DataDir, "server.cert.pem"]),
|
||||
Cacertfile = filename:join([DataDir, "ca-chain.cert.pem"]),
|
||||
ClientCert = filename:join(DataDir, "client.cert.pem"),
|
||||
ClientKey = filename:join(DataDir, "client.key.pem"),
|
||||
|
||||
ListenerId = "ssl:default",
|
||||
%% Enable CRL
|
||||
{ok, {{_, 200, _}, _, ListenerData0}} = get_listener_via_api(ListenerId),
|
||||
CRLConfig0 =
|
||||
#{
|
||||
<<"ssl_options">> =>
|
||||
#{
|
||||
<<"keyfile">> => Keyfile,
|
||||
<<"certfile">> => Certfile,
|
||||
<<"cacertfile">> => Cacertfile,
|
||||
<<"enable_crl_check">> => true,
|
||||
<<"fail_if_no_peer_cert">> => true
|
||||
}
|
||||
},
|
||||
ListenerData1 = emqx_utils_maps:deep_merge(ListenerData0, CRLConfig0),
|
||||
{ok, {_, _, ListenerData2}} = update_listener_via_api(ListenerId, ListenerData1),
|
||||
?assertMatch(
|
||||
#{
|
||||
<<"ssl_options">> :=
|
||||
#{
|
||||
<<"enable_crl_check">> := true,
|
||||
<<"verify">> := <<"verify_peer">>,
|
||||
<<"fail_if_no_peer_cert">> := true
|
||||
}
|
||||
},
|
||||
ListenerData2
|
||||
),
|
||||
|
||||
%% Disable CRL
|
||||
CRLConfig1 =
|
||||
#{
|
||||
<<"ssl_options">> =>
|
||||
#{
|
||||
<<"keyfile">> => Keyfile,
|
||||
<<"certfile">> => Certfile,
|
||||
<<"cacertfile">> => Cacertfile,
|
||||
<<"enable_crl_check">> => false,
|
||||
<<"fail_if_no_peer_cert">> => true
|
||||
}
|
||||
},
|
||||
ListenerData3 = emqx_utils_maps:deep_merge(ListenerData2, CRLConfig1),
|
||||
redbug:start(
|
||||
[
|
||||
"esockd_server:get_listener_prop -> return",
|
||||
"esockd_server:set_listener_prop -> return",
|
||||
"esockd:merge_opts -> return",
|
||||
"esockd_listener_sup:set_options -> return",
|
||||
"emqx_listeners:inject_crl_config -> return"
|
||||
],
|
||||
[{msgs, 100}]
|
||||
),
|
||||
{ok, {_, _, ListenerData4}} = update_listener_via_api(ListenerId, ListenerData3),
|
||||
?assertMatch(
|
||||
#{
|
||||
<<"ssl_options">> :=
|
||||
#{
|
||||
<<"enable_crl_check">> := false,
|
||||
<<"verify">> := <<"verify_peer">>,
|
||||
<<"fail_if_no_peer_cert">> := true
|
||||
}
|
||||
},
|
||||
ListenerData4
|
||||
),
|
||||
|
||||
%% Now the client that would be blocked tries to connect and should now be allowed.
|
||||
{ok, C} = emqtt:start_link([
|
||||
{ssl, true},
|
||||
{ssl_opts, [
|
||||
{certfile, ClientCert},
|
||||
{keyfile, ClientKey},
|
||||
{verify, verify_none}
|
||||
]},
|
||||
{port, 8883}
|
||||
]),
|
||||
?assertMatch({ok, _}, emqtt:connect(C)),
|
||||
emqtt:stop(C),
|
||||
|
||||
?assertNotReceive({http_get, _}),
|
||||
|
||||
ok.
|
||||
|
|
|
@ -79,6 +79,8 @@
|
|||
%% "Unofficial" `emqx_config_handler' and `emqx_conf' APIs
|
||||
-export([schema_module/0, upgrade_raw_conf/1]).
|
||||
|
||||
-export([skip_if_oss/0]).
|
||||
|
||||
-export_type([appspec/0]).
|
||||
-export_type([appspec_opts/0]).
|
||||
|
||||
|
@ -389,6 +391,8 @@ default_appspec(emqx_schema_validation, _SuiteOpts) ->
|
|||
#{schema_mod => emqx_schema_validation_schema, config => #{}};
|
||||
default_appspec(emqx_message_transformation, _SuiteOpts) ->
|
||||
#{schema_mod => emqx_message_transformation_schema, config => #{}};
|
||||
default_appspec(emqx_ds_shared_sub, _SuiteOpts) ->
|
||||
#{schema_mod => emqx_ds_shared_sub_schema, config => #{}};
|
||||
default_appspec(_, _) ->
|
||||
#{}.
|
||||
|
||||
|
@ -519,3 +523,14 @@ upgrade_raw_conf(Conf) ->
|
|||
ce ->
|
||||
emqx_conf_schema:upgrade_raw_conf(Conf)
|
||||
end.
|
||||
|
||||
skip_if_oss() ->
|
||||
try emqx_release:edition() of
|
||||
ee ->
|
||||
false;
|
||||
_ ->
|
||||
{skip, not_supported_in_oss}
|
||||
catch
|
||||
error:undef ->
|
||||
{skip, standalone_not_supported}
|
||||
end.
|
||||
|
|
|
@ -56,6 +56,8 @@ t_exclusive_sub(_) ->
|
|||
{ok, _} = emqtt:connect(C1),
|
||||
?CHECK_SUB(C1, 0),
|
||||
|
||||
?CHECK_SUB(C1, 0),
|
||||
|
||||
{ok, C2} = emqtt:start_link([
|
||||
{clientid, <<"client2">>},
|
||||
{clean_start, false},
|
||||
|
|
|
@ -63,6 +63,7 @@ groups() ->
|
|||
t_parse_malformed_properties,
|
||||
t_malformed_connect_header,
|
||||
t_malformed_connect_data,
|
||||
t_malformed_connect_data_proto_ver,
|
||||
t_reserved_connect_flag,
|
||||
t_invalid_clientid,
|
||||
t_undefined_password,
|
||||
|
@ -167,6 +168,8 @@ t_parse_malformed_utf8_string(_) ->
|
|||
ParseState = emqx_frame:initial_parse_state(#{strict_mode => true}),
|
||||
?ASSERT_FRAME_THROW(utf8_string_invalid, emqx_frame:parse(MalformedPacket, ParseState)).
|
||||
|
||||
%% TODO: parse v3 with 0 length clientid
|
||||
|
||||
t_serialize_parse_v3_connect(_) ->
|
||||
Bin =
|
||||
<<16, 37, 0, 6, 77, 81, 73, 115, 100, 112, 3, 2, 0, 60, 0, 23, 109, 111, 115, 113, 112, 117,
|
||||
|
@ -324,7 +327,7 @@ t_serialize_parse_bridge_connect(_) ->
|
|||
header = #mqtt_packet_header{type = ?CONNECT},
|
||||
variable = #mqtt_packet_connect{
|
||||
clientid = <<"C_00:0C:29:2B:77:52">>,
|
||||
proto_ver = 16#03,
|
||||
proto_ver = ?MQTT_PROTO_V3,
|
||||
proto_name = <<"MQIsdp">>,
|
||||
is_bridge = true,
|
||||
will_retain = true,
|
||||
|
@ -686,15 +689,36 @@ t_malformed_connect_header(_) ->
|
|||
).
|
||||
|
||||
t_malformed_connect_data(_) ->
|
||||
ProtoNameWithLen = <<0, 6, "MQIsdp">>,
|
||||
ConnectFlags = <<2#00000000>>,
|
||||
ClientIdwithLen = <<0, 1, "a">>,
|
||||
UnexpectedRestBin = <<0, 1, 2>>,
|
||||
?ASSERT_FRAME_THROW(
|
||||
#{cause := malformed_connect, unexpected_trailing_bytes := _},
|
||||
emqx_frame:parse(<<16, 15, 0, 6, 77, 81, 73, 115, 100, 112, 3, 0, 0, 0, 0, 0, 0>>)
|
||||
#{cause := malformed_connect, unexpected_trailing_bytes := 3},
|
||||
emqx_frame:parse(
|
||||
<<16, 18, ProtoNameWithLen/binary, ?MQTT_PROTO_V3, ConnectFlags/binary, 0, 0,
|
||||
ClientIdwithLen/binary, UnexpectedRestBin/binary>>
|
||||
)
|
||||
).
|
||||
|
||||
t_malformed_connect_data_proto_ver(_) ->
|
||||
Proto3NameWithLen = <<0, 6, "MQIsdp">>,
|
||||
?ASSERT_FRAME_THROW(
|
||||
#{cause := malformed_connect, header_bytes := <<>>},
|
||||
emqx_frame:parse(<<16, 8, Proto3NameWithLen/binary>>)
|
||||
),
|
||||
ProtoNameWithLen = <<0, 4, "MQTT">>,
|
||||
?ASSERT_FRAME_THROW(
|
||||
#{cause := malformed_connect, header_bytes := <<>>},
|
||||
emqx_frame:parse(<<16, 6, ProtoNameWithLen/binary>>)
|
||||
).
|
||||
|
||||
t_reserved_connect_flag(_) ->
|
||||
?assertException(
|
||||
throw,
|
||||
{frame_parse_error, reserved_connect_flag},
|
||||
{frame_parse_error, #{
|
||||
cause := reserved_connect_flag, proto_ver := ?MQTT_PROTO_V3, proto_name := <<"MQIsdp">>
|
||||
}},
|
||||
emqx_frame:parse(<<16, 15, 0, 6, 77, 81, 73, 115, 100, 112, 3, 1, 0, 0, 1, 0, 0>>)
|
||||
).
|
||||
|
||||
|
@ -726,7 +750,7 @@ t_undefined_password(_) ->
|
|||
},
|
||||
variable = #mqtt_packet_connect{
|
||||
proto_name = <<"MQTT">>,
|
||||
proto_ver = 4,
|
||||
proto_ver = ?MQTT_PROTO_V4,
|
||||
is_bridge = false,
|
||||
clean_start = true,
|
||||
will_flag = false,
|
||||
|
@ -774,7 +798,9 @@ t_invalid_will_retain(_) ->
|
|||
54, 75, 78, 112, 57, 0, 6, 68, 103, 55, 87, 87, 87>>,
|
||||
?assertException(
|
||||
throw,
|
||||
{frame_parse_error, invalid_will_retain},
|
||||
{frame_parse_error, #{
|
||||
cause := invalid_will_retain, proto_ver := ?MQTT_PROTO_V5, proto_name := <<"MQTT">>
|
||||
}},
|
||||
emqx_frame:parse(ConnectBin)
|
||||
),
|
||||
ok.
|
||||
|
@ -796,22 +822,30 @@ t_invalid_will_qos(_) ->
|
|||
),
|
||||
?assertException(
|
||||
throw,
|
||||
{frame_parse_error, invalid_will_qos},
|
||||
{frame_parse_error, #{
|
||||
cause := invalid_will_qos, proto_ver := ?MQTT_PROTO_V5, proto_name := <<"MQTT">>
|
||||
}},
|
||||
emqx_frame:parse(ConnectBinFun(Will_F_WillQoS1))
|
||||
),
|
||||
?assertException(
|
||||
throw,
|
||||
{frame_parse_error, invalid_will_qos},
|
||||
{frame_parse_error, #{
|
||||
cause := invalid_will_qos, proto_ver := ?MQTT_PROTO_V5, proto_name := <<"MQTT">>
|
||||
}},
|
||||
emqx_frame:parse(ConnectBinFun(Will_F_WillQoS2))
|
||||
),
|
||||
?assertException(
|
||||
throw,
|
||||
{frame_parse_error, invalid_will_qos},
|
||||
{frame_parse_error, #{
|
||||
cause := invalid_will_qos, proto_ver := ?MQTT_PROTO_V5, proto_name := <<"MQTT">>
|
||||
}},
|
||||
emqx_frame:parse(ConnectBinFun(Will_F_WillQoS3))
|
||||
),
|
||||
?assertException(
|
||||
throw,
|
||||
{frame_parse_error, invalid_will_qos},
|
||||
{frame_parse_error, #{
|
||||
cause := invalid_will_qos, proto_ver := ?MQTT_PROTO_V5, proto_name := <<"MQTT">>
|
||||
}},
|
||||
emqx_frame:parse(ConnectBinFun(Will_T_WillQoS3))
|
||||
),
|
||||
ok.
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
%% Have to use real msgs, as the schema is guarded by enum.
|
||||
-define(THROTTLE_MSG, authorization_permission_denied).
|
||||
-define(THROTTLE_MSG1, cannot_publish_to_topic_due_to_not_authorized).
|
||||
-define(THROTTLE_UNRECOVERABLE_MSG, unrecoverable_resource_error).
|
||||
-define(TIME_WINDOW, <<"1s">>).
|
||||
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
@ -59,6 +60,11 @@ end_per_suite(Config) ->
|
|||
emqx_cth_suite:stop(?config(suite_apps, Config)),
|
||||
emqx_config:delete_override_conf_files().
|
||||
|
||||
init_per_testcase(t_throttle_recoverable_msg, Config) ->
|
||||
ok = snabbkaffe:start_trace(),
|
||||
[?THROTTLE_MSG] = Conf = emqx:get_config([log, throttling, msgs]),
|
||||
{ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_UNRECOVERABLE_MSG | Conf], #{}),
|
||||
Config;
|
||||
init_per_testcase(t_throttle_add_new_msg, Config) ->
|
||||
ok = snabbkaffe:start_trace(),
|
||||
[?THROTTLE_MSG] = Conf = emqx:get_config([log, throttling, msgs]),
|
||||
|
@ -72,6 +78,10 @@ init_per_testcase(_TC, Config) ->
|
|||
ok = snabbkaffe:start_trace(),
|
||||
Config.
|
||||
|
||||
end_per_testcase(t_throttle_recoverable_msg, _Config) ->
|
||||
ok = snabbkaffe:stop(),
|
||||
{ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_MSG], #{}),
|
||||
ok;
|
||||
end_per_testcase(t_throttle_add_new_msg, _Config) ->
|
||||
ok = snabbkaffe:stop(),
|
||||
{ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_MSG], #{}),
|
||||
|
@ -101,8 +111,8 @@ t_throttle(_Config) ->
|
|||
5000
|
||||
),
|
||||
|
||||
?assert(emqx_log_throttler:allow(?THROTTLE_MSG)),
|
||||
?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG)),
|
||||
?assert(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)),
|
||||
?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)),
|
||||
{ok, _} = ?block_until(
|
||||
#{
|
||||
?snk_kind := log_throttler_dropped,
|
||||
|
@ -115,14 +125,48 @@ t_throttle(_Config) ->
|
|||
[]
|
||||
).
|
||||
|
||||
t_throttle_recoverable_msg(_Config) ->
|
||||
ResourceId = <<"resource_id">>,
|
||||
ThrottledMsg = iolist_to_binary([atom_to_list(?THROTTLE_UNRECOVERABLE_MSG), ":", ResourceId]),
|
||||
?check_trace(
|
||||
begin
|
||||
%% Warm-up and block to increase the probability that next events
|
||||
%% will be in the same throttling time window.
|
||||
{ok, _} = ?block_until(
|
||||
#{?snk_kind := log_throttler_new_msg, throttled_msg := ?THROTTLE_UNRECOVERABLE_MSG},
|
||||
5000
|
||||
),
|
||||
{_, {ok, _}} = ?wait_async_action(
|
||||
events(?THROTTLE_UNRECOVERABLE_MSG, ResourceId),
|
||||
#{
|
||||
?snk_kind := log_throttler_dropped,
|
||||
throttled_msg := ThrottledMsg
|
||||
},
|
||||
5000
|
||||
),
|
||||
|
||||
?assert(emqx_log_throttler:allow(?THROTTLE_UNRECOVERABLE_MSG, ResourceId)),
|
||||
?assertNot(emqx_log_throttler:allow(?THROTTLE_UNRECOVERABLE_MSG, ResourceId)),
|
||||
{ok, _} = ?block_until(
|
||||
#{
|
||||
?snk_kind := log_throttler_dropped,
|
||||
throttled_msg := ThrottledMsg,
|
||||
dropped_count := 1
|
||||
},
|
||||
3000
|
||||
)
|
||||
end,
|
||||
[]
|
||||
).
|
||||
|
||||
t_throttle_add_new_msg(_Config) ->
|
||||
?check_trace(
|
||||
begin
|
||||
{ok, _} = ?block_until(
|
||||
#{?snk_kind := log_throttler_new_msg, throttled_msg := ?THROTTLE_MSG1}, 5000
|
||||
),
|
||||
?assert(emqx_log_throttler:allow(?THROTTLE_MSG1)),
|
||||
?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG1)),
|
||||
?assert(emqx_log_throttler:allow(?THROTTLE_MSG1, undefined)),
|
||||
?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG1, undefined)),
|
||||
{ok, _} = ?block_until(
|
||||
#{
|
||||
?snk_kind := log_throttler_dropped,
|
||||
|
@ -137,10 +181,15 @@ t_throttle_add_new_msg(_Config) ->
|
|||
|
||||
t_throttle_no_msg(_Config) ->
|
||||
%% Must simply pass with no crashes
|
||||
?assert(emqx_log_throttler:allow(no_test_throttle_msg)),
|
||||
?assert(emqx_log_throttler:allow(no_test_throttle_msg)),
|
||||
timer:sleep(10),
|
||||
?assert(erlang:is_process_alive(erlang:whereis(emqx_log_throttler))).
|
||||
Pid = erlang:whereis(emqx_log_throttler),
|
||||
?assert(emqx_log_throttler:allow(no_test_throttle_msg, undefined)),
|
||||
?assert(emqx_log_throttler:allow(no_test_throttle_msg, undefined)),
|
||||
%% assert process is not restarted
|
||||
?assertEqual(Pid, erlang:whereis(emqx_log_throttler)),
|
||||
%% make a gen_call to ensure the process is alive
|
||||
%% note: this call result in an 'unexpected_call' error log.
|
||||
?assertEqual(ignored, gen_server:call(Pid, probe)),
|
||||
ok.
|
||||
|
||||
t_update_time_window(_Config) ->
|
||||
?check_trace(
|
||||
|
@ -168,8 +217,8 @@ t_throttle_debug_primary_level(_Config) ->
|
|||
#{?snk_kind := log_throttler_dropped, throttled_msg := ?THROTTLE_MSG},
|
||||
5000
|
||||
),
|
||||
?assert(emqx_log_throttler:allow(?THROTTLE_MSG)),
|
||||
?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG)),
|
||||
?assert(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)),
|
||||
?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)),
|
||||
{ok, _} = ?block_until(
|
||||
#{
|
||||
?snk_kind := log_throttler_dropped,
|
||||
|
@ -187,10 +236,13 @@ t_throttle_debug_primary_level(_Config) ->
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
events(Msg) ->
|
||||
events(100, Msg).
|
||||
events(100, Msg, undefined).
|
||||
|
||||
events(N, Msg) ->
|
||||
[emqx_log_throttler:allow(Msg) || _ <- lists:seq(1, N)].
|
||||
events(Msg, Id) ->
|
||||
events(100, Msg, Id).
|
||||
|
||||
events(N, Msg, Id) ->
|
||||
[emqx_log_throttler:allow(Msg, Id) || _ <- lists:seq(1, N)].
|
||||
|
||||
module_exists(Mod) ->
|
||||
case erlang:module_loaded(Mod) of
|
||||
|
|
|
@ -377,42 +377,60 @@ t_will_msg(_) ->
|
|||
|
||||
t_format(_) ->
|
||||
io:format("~ts", [
|
||||
emqx_packet:format(#mqtt_packet{
|
||||
emqx_packet:format(
|
||||
#mqtt_packet{
|
||||
header = #mqtt_packet_header{type = ?CONNACK, retain = true, dup = 0},
|
||||
variable = undefined
|
||||
})
|
||||
]),
|
||||
io:format("~ts", [
|
||||
emqx_packet:format(#mqtt_packet{
|
||||
header = #mqtt_packet_header{type = ?CONNACK}, variable = 1, payload = <<"payload">>
|
||||
})
|
||||
},
|
||||
text
|
||||
)
|
||||
]),
|
||||
io:format(
|
||||
"~ts",
|
||||
[
|
||||
emqx_packet:format(
|
||||
#mqtt_packet{
|
||||
header = #mqtt_packet_header{type = ?CONNACK},
|
||||
variable = 1,
|
||||
payload = <<"payload">>
|
||||
},
|
||||
text
|
||||
)
|
||||
]
|
||||
),
|
||||
io:format("~ts", [
|
||||
emqx_packet:format(
|
||||
?CONNECT_PACKET(#mqtt_packet_connect{
|
||||
?CONNECT_PACKET(
|
||||
#mqtt_packet_connect{
|
||||
will_flag = true,
|
||||
will_retain = true,
|
||||
will_qos = ?QOS_2,
|
||||
will_topic = <<"topic">>,
|
||||
will_payload = <<"payload">>
|
||||
})
|
||||
}
|
||||
),
|
||||
text
|
||||
)
|
||||
]),
|
||||
io:format("~ts", [
|
||||
emqx_packet:format(?CONNECT_PACKET(#mqtt_packet_connect{password = password}))
|
||||
emqx_packet:format(?CONNECT_PACKET(#mqtt_packet_connect{password = password}), text)
|
||||
]),
|
||||
io:format("~ts", [emqx_packet:format(?CONNACK_PACKET(?CONNACK_SERVER))]),
|
||||
io:format("~ts", [emqx_packet:format(?PUBLISH_PACKET(?QOS_1, 1))]),
|
||||
io:format("~ts", [emqx_packet:format(?PUBLISH_PACKET(?QOS_2, <<"topic">>, 10, <<"payload">>))]),
|
||||
io:format("~ts", [emqx_packet:format(?PUBACK_PACKET(?PUBACK, 98))]),
|
||||
io:format("~ts", [emqx_packet:format(?PUBREL_PACKET(99))]),
|
||||
io:format("~ts", [emqx_packet:format(?CONNACK_PACKET(?CONNACK_SERVER), text)]),
|
||||
io:format("~ts", [emqx_packet:format(?PUBLISH_PACKET(?QOS_1, 1), text)]),
|
||||
io:format("~ts", [
|
||||
emqx_packet:format(?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS_0}, {<<"topic1">>, ?QOS_1}]))
|
||||
emqx_packet:format(?PUBLISH_PACKET(?QOS_2, <<"topic">>, 10, <<"payload">>), text)
|
||||
]),
|
||||
io:format("~ts", [emqx_packet:format(?SUBACK_PACKET(40, [?QOS_0, ?QOS_1]))]),
|
||||
io:format("~ts", [emqx_packet:format(?UNSUBSCRIBE_PACKET(89, [<<"t">>, <<"t2">>]))]),
|
||||
io:format("~ts", [emqx_packet:format(?UNSUBACK_PACKET(90))]),
|
||||
io:format("~ts", [emqx_packet:format(?DISCONNECT_PACKET(128))]).
|
||||
io:format("~ts", [emqx_packet:format(?PUBACK_PACKET(?PUBACK, 98), text)]),
|
||||
io:format("~ts", [emqx_packet:format(?PUBREL_PACKET(99), text)]),
|
||||
io:format("~ts", [
|
||||
emqx_packet:format(
|
||||
?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS_0}, {<<"topic1">>, ?QOS_1}]), text
|
||||
)
|
||||
]),
|
||||
io:format("~ts", [emqx_packet:format(?SUBACK_PACKET(40, [?QOS_0, ?QOS_1]), text)]),
|
||||
io:format("~ts", [emqx_packet:format(?UNSUBSCRIBE_PACKET(89, [<<"t">>, <<"t2">>]), text)]),
|
||||
io:format("~ts", [emqx_packet:format(?UNSUBACK_PACKET(90), text)]),
|
||||
io:format("~ts", [emqx_packet:format(?DISCONNECT_PACKET(128), text)]).
|
||||
|
||||
t_parse_empty_publish(_) ->
|
||||
%% 52: 0011(type=PUBLISH) 0100 (QoS=2)
|
||||
|
|
|
@ -573,7 +573,7 @@ app_specs(Opts) ->
|
|||
|
||||
cluster() ->
|
||||
ExtraConf = "\n durable_storage.messages.n_sites = 2",
|
||||
Spec = #{role => core, apps => app_specs(#{extra_emqx_conf => ExtraConf})},
|
||||
Spec = #{apps => app_specs(#{extra_emqx_conf => ExtraConf})},
|
||||
[
|
||||
{persistent_messages_SUITE1, Spec},
|
||||
{persistent_messages_SUITE2, Spec}
|
||||
|
|
|
@ -64,10 +64,17 @@ init_per_group(routing_schema_v2, Config) ->
|
|||
init_per_group(batch_sync_on, Config) ->
|
||||
[{emqx_config, "broker.routing.batch_sync.enable_on = all"} | Config];
|
||||
init_per_group(batch_sync_replicants, Config) ->
|
||||
case emqx_cth_suite:skip_if_oss() of
|
||||
false ->
|
||||
[{emqx_config, "broker.routing.batch_sync.enable_on = replicant"} | Config];
|
||||
True ->
|
||||
True
|
||||
end;
|
||||
init_per_group(batch_sync_off, Config) ->
|
||||
[{emqx_config, "broker.routing.batch_sync.enable_on = none"} | Config];
|
||||
init_per_group(cluster, Config) ->
|
||||
case emqx_cth_suite:skip_if_oss() of
|
||||
false ->
|
||||
WorkDir = emqx_cth_suite:work_dir(Config),
|
||||
NodeSpecs = [
|
||||
{emqx_routing_SUITE1, #{apps => [mk_emqx_appspec(1, Config)], role => core}},
|
||||
|
@ -76,6 +83,9 @@ init_per_group(cluster, Config) ->
|
|||
],
|
||||
Nodes = emqx_cth_cluster:start(NodeSpecs, #{work_dir => WorkDir}),
|
||||
[{cluster, Nodes} | Config];
|
||||
True ->
|
||||
True
|
||||
end;
|
||||
init_per_group(GroupName, Config) when
|
||||
GroupName =:= single_batch_on;
|
||||
GroupName =:= single
|
||||
|
|
|
@ -1247,7 +1247,7 @@ recv_msgs(Count, Msgs) ->
|
|||
start_peer(Name, Port) ->
|
||||
{ok, Node} = emqx_cth_peer:start_link(
|
||||
Name,
|
||||
ebin_path()
|
||||
emqx_common_test_helpers:ebin_path()
|
||||
),
|
||||
pong = net_adm:ping(Node),
|
||||
setup_node(Node, Port),
|
||||
|
@ -1261,9 +1261,6 @@ host() ->
|
|||
[_, Host] = string:tokens(atom_to_list(node()), "@"),
|
||||
Host.
|
||||
|
||||
ebin_path() ->
|
||||
["-pa" | code:get_path()].
|
||||
|
||||
setup_node(Node, Port) ->
|
||||
EnvHandler =
|
||||
fun(_) ->
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
%% -*- mode: erlang -*-
|
||||
{application, emqx_auth, [
|
||||
{description, "EMQX Authentication and authorization"},
|
||||
{vsn, "0.3.3"},
|
||||
{vsn, "0.3.4"},
|
||||
{modules, []},
|
||||
{registered, [emqx_auth_sup]},
|
||||
{applications, [
|
||||
|
|
|
@ -203,6 +203,7 @@ common_fields() ->
|
|||
|
||||
enable(type) -> boolean();
|
||||
enable(default) -> true;
|
||||
enable(importance) -> ?IMPORTANCE_NO_DOC;
|
||||
enable(desc) -> ?DESC(?FUNCTION_NAME);
|
||||
enable(_) -> undefined.
|
||||
|
||||
|
|
|
@ -477,9 +477,15 @@ authorize_deny(
|
|||
sources()
|
||||
) ->
|
||||
authz_result().
|
||||
authorize(Client, PubSub, Topic, _DefaultResult, Sources) ->
|
||||
authorize(#{username := Username} = Client, PubSub, Topic, _DefaultResult, Sources) ->
|
||||
case maps:get(is_superuser, Client, false) of
|
||||
true ->
|
||||
?tp(authz_skipped, #{reason => client_is_superuser, action => PubSub}),
|
||||
?TRACE("AUTHZ", "authorization_skipped_as_superuser", #{
|
||||
username => Username,
|
||||
topic => Topic,
|
||||
action => emqx_access_control:format_action(PubSub)
|
||||
}),
|
||||
emqx_metrics:inc(?METRIC_SUPERUSER),
|
||||
{stop, #{result => allow, from => superuser}};
|
||||
false ->
|
||||
|
|
|
@ -170,7 +170,12 @@ api_authz_refs() ->
|
|||
authz_common_fields(Type) ->
|
||||
[
|
||||
{type, ?HOCON(Type, #{required => true, desc => ?DESC(type)})},
|
||||
{enable, ?HOCON(boolean(), #{default => true, desc => ?DESC(enable)})}
|
||||
{enable,
|
||||
?HOCON(boolean(), #{
|
||||
default => true,
|
||||
importance => ?IMPORTANCE_NO_DOC,
|
||||
desc => ?DESC(enable)
|
||||
})}
|
||||
].
|
||||
|
||||
source_types() ->
|
||||
|
|
|
@ -674,5 +674,77 @@ t_publish_last_will_testament_banned_client_connecting(_Config) ->
|
|||
|
||||
ok.
|
||||
|
||||
t_sikpped_as_superuser(_Config) ->
|
||||
ClientInfo = #{
|
||||
clientid => <<"clientid">>,
|
||||
username => <<"username">>,
|
||||
peerhost => {127, 0, 0, 1},
|
||||
zone => default,
|
||||
listener => {tcp, default},
|
||||
is_superuser => true
|
||||
},
|
||||
?check_trace(
|
||||
begin
|
||||
?assertEqual(
|
||||
allow,
|
||||
emqx_access_control:authorize(ClientInfo, ?AUTHZ_PUBLISH(?QOS_0), <<"p/t/0">>)
|
||||
),
|
||||
?assertEqual(
|
||||
allow,
|
||||
emqx_access_control:authorize(ClientInfo, ?AUTHZ_PUBLISH(?QOS_1), <<"p/t/1">>)
|
||||
),
|
||||
?assertEqual(
|
||||
allow,
|
||||
emqx_access_control:authorize(ClientInfo, ?AUTHZ_PUBLISH(?QOS_2), <<"p/t/2">>)
|
||||
),
|
||||
?assertEqual(
|
||||
allow,
|
||||
emqx_access_control:authorize(ClientInfo, ?AUTHZ_SUBSCRIBE(?QOS_0), <<"s/t/0">>)
|
||||
),
|
||||
?assertEqual(
|
||||
allow,
|
||||
emqx_access_control:authorize(ClientInfo, ?AUTHZ_SUBSCRIBE(?QOS_1), <<"s/t/1">>)
|
||||
),
|
||||
?assertEqual(
|
||||
allow,
|
||||
emqx_access_control:authorize(ClientInfo, ?AUTHZ_SUBSCRIBE(?QOS_2), <<"s/t/2">>)
|
||||
)
|
||||
end,
|
||||
fun(Trace) ->
|
||||
?assertMatch(
|
||||
[
|
||||
#{
|
||||
reason := client_is_superuser,
|
||||
action := #{qos := ?QOS_0, action_type := publish}
|
||||
},
|
||||
#{
|
||||
reason := client_is_superuser,
|
||||
action := #{qos := ?QOS_1, action_type := publish}
|
||||
},
|
||||
#{
|
||||
reason := client_is_superuser,
|
||||
action := #{qos := ?QOS_2, action_type := publish}
|
||||
},
|
||||
#{
|
||||
reason := client_is_superuser,
|
||||
action := #{qos := ?QOS_0, action_type := subscribe}
|
||||
},
|
||||
#{
|
||||
reason := client_is_superuser,
|
||||
action := #{qos := ?QOS_1, action_type := subscribe}
|
||||
},
|
||||
#{
|
||||
reason := client_is_superuser,
|
||||
action := #{qos := ?QOS_2, action_type := subscribe}
|
||||
}
|
||||
],
|
||||
?of_kind(authz_skipped, Trace)
|
||||
),
|
||||
ok
|
||||
end
|
||||
),
|
||||
|
||||
ok = snabbkaffe:stop().
|
||||
|
||||
stop_apps(Apps) ->
|
||||
lists:foreach(fun application:stop/1, Apps).
|
||||
|
|
|
@ -118,8 +118,8 @@ mk_cluster_spec(Opts) ->
|
|||
Node1Apps = Apps ++ [{emqx_dashboard, "dashboard.listeners.http {enable=true,bind=18083}"}],
|
||||
Node2Apps = Apps,
|
||||
[
|
||||
{emqx_authz_api_cluster_SUITE1, Opts#{role => core, apps => Node1Apps}},
|
||||
{emqx_authz_api_cluster_SUITE2, Opts#{role => core, apps => Node2Apps}}
|
||||
{emqx_authz_api_cluster_SUITE1, Opts#{apps => Node1Apps}},
|
||||
{emqx_authz_api_cluster_SUITE2, Opts#{apps => Node2Apps}}
|
||||
].
|
||||
|
||||
request(Method, URL, Body, Config) ->
|
||||
|
|
|
@ -31,4 +31,6 @@
|
|||
-define(AUTHN_TYPE, {?AUTHN_MECHANISM, ?AUTHN_BACKEND}).
|
||||
-define(AUTHN_TYPE_SCRAM, {?AUTHN_MECHANISM_SCRAM, ?AUTHN_BACKEND}).
|
||||
|
||||
-define(AUTHN_DATA_FIELDS, [is_superuser, client_attrs, expire_at, acl]).
|
||||
|
||||
-endif.
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
%% -*- mode: erlang -*-
|
||||
{application, emqx_auth_http, [
|
||||
{description, "EMQX External HTTP API Authentication and Authorization"},
|
||||
{vsn, "0.3.0"},
|
||||
{vsn, "0.3.1"},
|
||||
{registered, []},
|
||||
{mod, {emqx_auth_http_app, []}},
|
||||
{applications, [
|
||||
|
|
|
@ -25,7 +25,7 @@
|
|||
start(_StartType, _StartArgs) ->
|
||||
ok = emqx_authz:register_source(?AUTHZ_TYPE, emqx_authz_http),
|
||||
ok = emqx_authn:register_provider(?AUTHN_TYPE, emqx_authn_http),
|
||||
ok = emqx_authn:register_provider(?AUTHN_TYPE_SCRAM, emqx_authn_scram_http),
|
||||
ok = emqx_authn:register_provider(?AUTHN_TYPE_SCRAM, emqx_authn_scram_restapi),
|
||||
{ok, Sup} = emqx_auth_http_sup:start_link(),
|
||||
{ok, Sup}.
|
||||
|
||||
|
|
|
@ -32,7 +32,9 @@
|
|||
with_validated_config/2,
|
||||
generate_request/2,
|
||||
request_for_log/2,
|
||||
response_for_log/1
|
||||
response_for_log/1,
|
||||
extract_auth_data/2,
|
||||
safely_parse_body/2
|
||||
]).
|
||||
|
||||
-define(DEFAULT_CONTENT_TYPE, <<"application/json">>).
|
||||
|
@ -194,23 +196,28 @@ handle_response(Headers, Body) ->
|
|||
case safely_parse_body(ContentType, Body) of
|
||||
{ok, NBody} ->
|
||||
body_to_auth_data(NBody);
|
||||
{error, Reason} ->
|
||||
?TRACE_AUTHN_PROVIDER(
|
||||
error,
|
||||
"parse_http_response_failed",
|
||||
#{content_type => ContentType, body => Body, reason => Reason}
|
||||
),
|
||||
{error, _Reason} ->
|
||||
ignore
|
||||
end.
|
||||
|
||||
body_to_auth_data(Body) ->
|
||||
case maps:get(<<"result">>, Body, <<"ignore">>) of
|
||||
<<"allow">> ->
|
||||
extract_auth_data(http, Body);
|
||||
<<"deny">> ->
|
||||
{error, not_authorized};
|
||||
<<"ignore">> ->
|
||||
ignore;
|
||||
_ ->
|
||||
ignore
|
||||
end.
|
||||
|
||||
extract_auth_data(Source, Body) ->
|
||||
IsSuperuser = emqx_authn_utils:is_superuser(Body),
|
||||
Attrs = emqx_authn_utils:client_attrs(Body),
|
||||
try
|
||||
ExpireAt = expire_at(Body),
|
||||
ACL = acl(ExpireAt, Body),
|
||||
ACL = acl(ExpireAt, Source, Body),
|
||||
Result = merge_maps([ExpireAt, IsSuperuser, ACL, Attrs]),
|
||||
{ok, Result}
|
||||
catch
|
||||
|
@ -221,13 +228,6 @@ body_to_auth_data(Body) ->
|
|||
throw:Reason ->
|
||||
?TRACE_AUTHN_PROVIDER("bad_response_body", Reason#{http_body => Body}),
|
||||
{error, bad_username_or_password}
|
||||
end;
|
||||
<<"deny">> ->
|
||||
{error, not_authorized};
|
||||
<<"ignore">> ->
|
||||
ignore;
|
||||
_ ->
|
||||
ignore
|
||||
end.
|
||||
|
||||
merge_maps([]) -> #{};
|
||||
|
@ -268,40 +268,43 @@ expire_sec(#{<<"expire_at">> := _}) ->
|
|||
expire_sec(_) ->
|
||||
undefined.
|
||||
|
||||
acl(#{expire_at := ExpireTimeMs}, #{<<"acl">> := Rules}) ->
|
||||
acl(#{expire_at := ExpireTimeMs}, Source, #{<<"acl">> := Rules}) ->
|
||||
#{
|
||||
acl => #{
|
||||
source_for_logging => http,
|
||||
source_for_logging => Source,
|
||||
rules => emqx_authz_rule_raw:parse_and_compile_rules(Rules),
|
||||
%% It's seconds level precision (like JWT) for authz
|
||||
%% see emqx_authz_client_info:check/1
|
||||
expire => erlang:convert_time_unit(ExpireTimeMs, millisecond, second)
|
||||
}
|
||||
};
|
||||
acl(_NoExpire, #{<<"acl">> := Rules}) ->
|
||||
acl(_NoExpire, Source, #{<<"acl">> := Rules}) ->
|
||||
#{
|
||||
acl => #{
|
||||
source_for_logging => http,
|
||||
source_for_logging => Source,
|
||||
rules => emqx_authz_rule_raw:parse_and_compile_rules(Rules)
|
||||
}
|
||||
};
|
||||
acl(_, _) ->
|
||||
acl(_, _, _) ->
|
||||
#{}.
|
||||
|
||||
safely_parse_body(ContentType, Body) ->
|
||||
try
|
||||
parse_body(ContentType, Body)
|
||||
catch
|
||||
_Class:_Reason ->
|
||||
_Class:Reason ->
|
||||
?TRACE_AUTHN_PROVIDER(
|
||||
error,
|
||||
"parse_http_response_failed",
|
||||
#{content_type => ContentType, body => Body, reason => Reason}
|
||||
),
|
||||
{error, invalid_body}
|
||||
end.
|
||||
|
||||
parse_body(<<"application/json", _/binary>>, Body) ->
|
||||
{ok, emqx_utils_json:decode(Body, [return_maps])};
|
||||
parse_body(<<"application/x-www-form-urlencoded", _/binary>>, Body) ->
|
||||
Flags = [<<"result">>, <<"is_superuser">>],
|
||||
RawMap = maps:from_list(cow_qs:parse_qs(Body)),
|
||||
NBody = maps:with(Flags, RawMap),
|
||||
NBody = maps:from_list(cow_qs:parse_qs(Body)),
|
||||
{ok, NBody};
|
||||
parse_body(ContentType, _) ->
|
||||
{error, {unsupported_content_type, ContentType}}.
|
||||
|
|
|
@ -2,10 +2,19 @@
|
|||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_authn_scram_http).
|
||||
%% Note:
|
||||
%% This is not an implementation of the RFC 7804:
|
||||
%% Salted Challenge Response HTTP Authentication Mechanism.
|
||||
%% This backend is an implementation of scram,
|
||||
%% which uses an external web resource as a source of user information.
|
||||
|
||||
-include_lib("emqx_auth/include/emqx_authn.hrl").
|
||||
-module(emqx_authn_scram_restapi).
|
||||
|
||||
-feature(maybe_expr, enable).
|
||||
|
||||
-include("emqx_auth_http.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
-include_lib("emqx_auth/include/emqx_authn.hrl").
|
||||
|
||||
-behaviour(emqx_authn_provider).
|
||||
|
||||
|
@ -22,10 +31,6 @@
|
|||
<<"salt">>
|
||||
]).
|
||||
|
||||
-define(OPTIONAL_USER_INFO_KEYS, [
|
||||
<<"is_superuser">>
|
||||
]).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% APIs
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -72,7 +77,9 @@ authenticate(
|
|||
reason => Reason
|
||||
})
|
||||
end,
|
||||
emqx_utils_scram:authenticate(AuthMethod, AuthData, AuthCache, RetrieveFun, OnErrFun, State);
|
||||
emqx_utils_scram:authenticate(
|
||||
AuthMethod, AuthData, AuthCache, State, RetrieveFun, OnErrFun, ?AUTHN_DATA_FIELDS
|
||||
);
|
||||
authenticate(_Credential, _State) ->
|
||||
ignore.
|
||||
|
||||
|
@ -95,7 +102,7 @@ retrieve(
|
|||
) ->
|
||||
Request = emqx_authn_http:generate_request(Credential#{username := Username}, State),
|
||||
Response = emqx_resource:simple_sync_query(ResourceId, {Method, Request, RequestTimeout}),
|
||||
?TRACE_AUTHN_PROVIDER("scram_http_response", #{
|
||||
?TRACE_AUTHN_PROVIDER("scram_restapi_response", #{
|
||||
request => emqx_authn_http:request_for_log(Credential, State),
|
||||
response => emqx_authn_http:response_for_log(Response),
|
||||
resource => ResourceId
|
||||
|
@ -113,16 +120,11 @@ retrieve(
|
|||
|
||||
handle_response(Headers, Body) ->
|
||||
ContentType = proplists:get_value(<<"content-type">>, Headers),
|
||||
case safely_parse_body(ContentType, Body) of
|
||||
{ok, NBody} ->
|
||||
body_to_user_info(NBody);
|
||||
{error, Reason} = Error ->
|
||||
?TRACE_AUTHN_PROVIDER(
|
||||
error,
|
||||
"parse_scram_http_response_failed",
|
||||
#{content_type => ContentType, body => Body, reason => Reason}
|
||||
),
|
||||
Error
|
||||
maybe
|
||||
{ok, NBody} ?= emqx_authn_http:safely_parse_body(ContentType, Body),
|
||||
{ok, UserInfo} ?= body_to_user_info(NBody),
|
||||
{ok, AuthData} ?= emqx_authn_http:extract_auth_data(scram_restapi, NBody),
|
||||
{ok, maps:merge(AuthData, UserInfo)}
|
||||
end.
|
||||
|
||||
body_to_user_info(Body) ->
|
||||
|
@ -131,26 +133,16 @@ body_to_user_info(Body) ->
|
|||
true ->
|
||||
case safely_convert_hex(Required0) of
|
||||
{ok, Required} ->
|
||||
UserInfo0 = maps:merge(Required, maps:with(?OPTIONAL_USER_INFO_KEYS, Body)),
|
||||
UserInfo1 = emqx_utils_maps:safe_atom_key_map(UserInfo0),
|
||||
UserInfo = maps:merge(#{is_superuser => false}, UserInfo1),
|
||||
{ok, UserInfo};
|
||||
{ok, emqx_utils_maps:safe_atom_key_map(Required)};
|
||||
Error ->
|
||||
?TRACE_AUTHN_PROVIDER("decode_keys_failed", #{http_body => Body}),
|
||||
Error
|
||||
end;
|
||||
_ ->
|
||||
?TRACE_AUTHN_PROVIDER("bad_response_body", #{http_body => Body}),
|
||||
?TRACE_AUTHN_PROVIDER("missing_requried_keys", #{http_body => Body}),
|
||||
{error, bad_response}
|
||||
end.
|
||||
|
||||
safely_parse_body(ContentType, Body) ->
|
||||
try
|
||||
parse_body(ContentType, Body)
|
||||
catch
|
||||
_Class:_Reason ->
|
||||
{error, invalid_body}
|
||||
end.
|
||||
|
||||
safely_convert_hex(Required) ->
|
||||
try
|
||||
{ok,
|
||||
|
@ -165,15 +157,5 @@ safely_convert_hex(Required) ->
|
|||
{error, Reason}
|
||||
end.
|
||||
|
||||
parse_body(<<"application/json", _/binary>>, Body) ->
|
||||
{ok, emqx_utils_json:decode(Body, [return_maps])};
|
||||
parse_body(<<"application/x-www-form-urlencoded", _/binary>>, Body) ->
|
||||
Flags = ?REQUIRED_USER_INFO_KEYS ++ ?OPTIONAL_USER_INFO_KEYS,
|
||||
RawMap = maps:from_list(cow_qs:parse_qs(Body)),
|
||||
NBody = maps:with(Flags, RawMap),
|
||||
{ok, NBody};
|
||||
parse_body(ContentType, _) ->
|
||||
{error, {unsupported_content_type, ContentType}}.
|
||||
|
||||
merge_scram_conf(Conf, State) ->
|
||||
maps:merge(maps:with([algorithm, iteration_count], Conf), State).
|
|
@ -2,7 +2,7 @@
|
|||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_authn_scram_http_schema).
|
||||
-module(emqx_authn_scram_restapi_schema).
|
||||
|
||||
-behaviour(emqx_authn_schema).
|
||||
|
||||
|
@ -22,16 +22,16 @@
|
|||
namespace() -> "authn".
|
||||
|
||||
refs() ->
|
||||
[?R_REF(scram_http_get), ?R_REF(scram_http_post)].
|
||||
[?R_REF(scram_restapi_get), ?R_REF(scram_restapi_post)].
|
||||
|
||||
select_union_member(
|
||||
#{<<"mechanism">> := ?AUTHN_MECHANISM_SCRAM_BIN, <<"backend">> := ?AUTHN_BACKEND_BIN} = Value
|
||||
) ->
|
||||
case maps:get(<<"method">>, Value, undefined) of
|
||||
<<"get">> ->
|
||||
[?R_REF(scram_http_get)];
|
||||
[?R_REF(scram_restapi_get)];
|
||||
<<"post">> ->
|
||||
[?R_REF(scramm_http_post)];
|
||||
[?R_REF(scram_restapi_post)];
|
||||
Else ->
|
||||
throw(#{
|
||||
reason => "unknown_http_method",
|
||||
|
@ -43,20 +43,20 @@ select_union_member(
|
|||
select_union_member(_Value) ->
|
||||
undefined.
|
||||
|
||||
fields(scram_http_get) ->
|
||||
fields(scram_restapi_get) ->
|
||||
[
|
||||
{method, #{type => get, required => true, desc => ?DESC(emqx_authn_http_schema, method)}},
|
||||
{headers, fun emqx_authn_http_schema:headers_no_content_type/1}
|
||||
] ++ common_fields();
|
||||
fields(scram_http_post) ->
|
||||
fields(scram_restapi_post) ->
|
||||
[
|
||||
{method, #{type => post, required => true, desc => ?DESC(emqx_authn_http_schema, method)}},
|
||||
{headers, fun emqx_authn_http_schema:headers/1}
|
||||
] ++ common_fields().
|
||||
|
||||
desc(scram_http_get) ->
|
||||
desc(scram_restapi_get) ->
|
||||
?DESC(emqx_authn_http_schema, get);
|
||||
desc(scram_http_post) ->
|
||||
desc(scram_restapi_post) ->
|
||||
?DESC(emqx_authn_http_schema, post);
|
||||
desc(_) ->
|
||||
undefined.
|
|
@ -2,7 +2,7 @@
|
|||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_authn_scram_http_SUITE).
|
||||
-module(emqx_authn_scram_restapi_SUITE).
|
||||
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
@ -21,6 +21,9 @@
|
|||
-define(ALGORITHM_STR, <<"sha512">>).
|
||||
-define(ITERATION_COUNT, 4096).
|
||||
|
||||
-define(T_ACL_USERNAME, <<"username">>).
|
||||
-define(T_ACL_PASSWORD, <<"password">>).
|
||||
|
||||
-include_lib("emqx/include/emqx_placeholder.hrl").
|
||||
|
||||
all() ->
|
||||
|
@ -54,11 +57,11 @@ init_per_testcase(_Case, Config) ->
|
|||
[authentication],
|
||||
?GLOBAL
|
||||
),
|
||||
{ok, _} = emqx_authn_scram_http_test_server:start_link(?HTTP_PORT, ?HTTP_PATH),
|
||||
{ok, _} = emqx_authn_scram_restapi_test_server:start_link(?HTTP_PORT, ?HTTP_PATH),
|
||||
Config.
|
||||
|
||||
end_per_testcase(_Case, _Config) ->
|
||||
ok = emqx_authn_scram_http_test_server:stop().
|
||||
ok = emqx_authn_scram_restapi_test_server:stop().
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Tests
|
||||
|
@ -72,7 +75,9 @@ t_create(_Config) ->
|
|||
{create_authenticator, ?GLOBAL, AuthConfig}
|
||||
),
|
||||
|
||||
{ok, [#{provider := emqx_authn_scram_http}]} = emqx_authn_chains:list_authenticators(?GLOBAL).
|
||||
{ok, [#{provider := emqx_authn_scram_restapi}]} = emqx_authn_chains:list_authenticators(
|
||||
?GLOBAL
|
||||
).
|
||||
|
||||
t_create_invalid(_Config) ->
|
||||
AuthConfig = raw_config(),
|
||||
|
@ -118,59 +123,8 @@ t_authenticate(_Config) ->
|
|||
|
||||
ok = emqx_config:put([mqtt, idle_timeout], 500),
|
||||
|
||||
{ok, Pid} = emqx_authn_mqtt_test_client:start_link("127.0.0.1", 1883),
|
||||
|
||||
ClientFirstMessage = esasl_scram:client_first_message(Username),
|
||||
|
||||
ConnectPacket = ?CONNECT_PACKET(
|
||||
#mqtt_packet_connect{
|
||||
proto_ver = ?MQTT_PROTO_V5,
|
||||
properties = #{
|
||||
'Authentication-Method' => <<"SCRAM-SHA-512">>,
|
||||
'Authentication-Data' => ClientFirstMessage
|
||||
}
|
||||
}
|
||||
),
|
||||
|
||||
ok = emqx_authn_mqtt_test_client:send(Pid, ConnectPacket),
|
||||
|
||||
%% Intentional sleep to trigger idle timeout for the connection not yet authenticated
|
||||
ok = ct:sleep(1000),
|
||||
|
||||
?AUTH_PACKET(
|
||||
?RC_CONTINUE_AUTHENTICATION,
|
||||
#{'Authentication-Data' := ServerFirstMessage}
|
||||
) = receive_packet(),
|
||||
|
||||
{continue, ClientFinalMessage, ClientCache} =
|
||||
esasl_scram:check_server_first_message(
|
||||
ServerFirstMessage,
|
||||
#{
|
||||
client_first_message => ClientFirstMessage,
|
||||
password => Password,
|
||||
algorithm => ?ALGORITHM
|
||||
}
|
||||
),
|
||||
|
||||
AuthContinuePacket = ?AUTH_PACKET(
|
||||
?RC_CONTINUE_AUTHENTICATION,
|
||||
#{
|
||||
'Authentication-Method' => <<"SCRAM-SHA-512">>,
|
||||
'Authentication-Data' => ClientFinalMessage
|
||||
}
|
||||
),
|
||||
|
||||
ok = emqx_authn_mqtt_test_client:send(Pid, AuthContinuePacket),
|
||||
|
||||
?CONNACK_PACKET(
|
||||
?RC_SUCCESS,
|
||||
_,
|
||||
#{'Authentication-Data' := ServerFinalMessage}
|
||||
) = receive_packet(),
|
||||
|
||||
ok = esasl_scram:check_server_final_message(
|
||||
ServerFinalMessage, ClientCache#{algorithm => ?ALGORITHM}
|
||||
).
|
||||
{ok, Pid} = create_connection(Username, Password),
|
||||
emqx_authn_mqtt_test_client:stop(Pid).
|
||||
|
||||
t_authenticate_bad_props(_Config) ->
|
||||
Username = <<"u">>,
|
||||
|
@ -314,6 +268,47 @@ t_destroy(_Config) ->
|
|||
_
|
||||
) = receive_packet().
|
||||
|
||||
t_acl(_Config) ->
|
||||
init_auth(),
|
||||
|
||||
ACL = emqx_authn_http_SUITE:acl_rules(),
|
||||
set_user_handler(?T_ACL_USERNAME, ?T_ACL_PASSWORD, #{acl => ACL}),
|
||||
{ok, Pid} = create_connection(?T_ACL_USERNAME, ?T_ACL_PASSWORD),
|
||||
|
||||
Cases = [
|
||||
{allow, <<"http-authn-acl/#">>},
|
||||
{deny, <<"http-authn-acl/1">>},
|
||||
{deny, <<"t/#">>}
|
||||
],
|
||||
|
||||
try
|
||||
lists:foreach(
|
||||
fun(Case) ->
|
||||
test_acl(Case, Pid)
|
||||
end,
|
||||
Cases
|
||||
)
|
||||
after
|
||||
ok = emqx_authn_mqtt_test_client:stop(Pid)
|
||||
end.
|
||||
|
||||
t_auth_expire(_Config) ->
|
||||
init_auth(),
|
||||
|
||||
ExpireSec = 3,
|
||||
WaitTime = timer:seconds(ExpireSec + 1),
|
||||
ACL = emqx_authn_http_SUITE:acl_rules(),
|
||||
|
||||
set_user_handler(?T_ACL_USERNAME, ?T_ACL_PASSWORD, #{
|
||||
acl => ACL,
|
||||
expire_at =>
|
||||
erlang:system_time(second) + ExpireSec
|
||||
}),
|
||||
{ok, Pid} = create_connection(?T_ACL_USERNAME, ?T_ACL_PASSWORD),
|
||||
|
||||
timer:sleep(WaitTime),
|
||||
?assertEqual(false, erlang:is_process_alive(Pid)).
|
||||
|
||||
t_is_superuser() ->
|
||||
State = init_auth(),
|
||||
ok = test_is_superuser(State, false),
|
||||
|
@ -324,12 +319,12 @@ test_is_superuser(State, ExpectedIsSuperuser) ->
|
|||
Username = <<"u">>,
|
||||
Password = <<"p">>,
|
||||
|
||||
set_user_handler(Username, Password, ExpectedIsSuperuser),
|
||||
set_user_handler(Username, Password, #{is_superuser => ExpectedIsSuperuser}),
|
||||
|
||||
ClientFirstMessage = esasl_scram:client_first_message(Username),
|
||||
|
||||
{continue, ServerFirstMessage, ServerCache} =
|
||||
emqx_authn_scram_http:authenticate(
|
||||
emqx_authn_scram_restapi:authenticate(
|
||||
#{
|
||||
auth_method => <<"SCRAM-SHA-512">>,
|
||||
auth_data => ClientFirstMessage,
|
||||
|
@ -349,7 +344,7 @@ test_is_superuser(State, ExpectedIsSuperuser) ->
|
|||
),
|
||||
|
||||
{ok, UserInfo1, ServerFinalMessage} =
|
||||
emqx_authn_scram_http:authenticate(
|
||||
emqx_authn_scram_restapi:authenticate(
|
||||
#{
|
||||
auth_method => <<"SCRAM-SHA-512">>,
|
||||
auth_data => ClientFinalMessage,
|
||||
|
@ -382,24 +377,25 @@ raw_config() ->
|
|||
}.
|
||||
|
||||
set_user_handler(Username, Password) ->
|
||||
set_user_handler(Username, Password, false).
|
||||
set_user_handler(Username, Password, IsSuperuser) ->
|
||||
set_user_handler(Username, Password, #{is_superuser => false}).
|
||||
set_user_handler(Username, Password, Extra0) ->
|
||||
%% HTTP Server
|
||||
Handler = fun(Req0, State) ->
|
||||
#{
|
||||
username := Username
|
||||
} = cowboy_req:match_qs([username], Req0),
|
||||
|
||||
UserInfo = make_user_info(Password, ?ALGORITHM, ?ITERATION_COUNT, IsSuperuser),
|
||||
UserInfo = make_user_info(Password, ?ALGORITHM, ?ITERATION_COUNT),
|
||||
Extra = maps:merge(#{is_superuser => false}, Extra0),
|
||||
Req = cowboy_req:reply(
|
||||
200,
|
||||
#{<<"content-type">> => <<"application/json">>},
|
||||
emqx_utils_json:encode(UserInfo),
|
||||
emqx_utils_json:encode(maps:merge(Extra, UserInfo)),
|
||||
Req0
|
||||
),
|
||||
{ok, Req, State}
|
||||
end,
|
||||
ok = emqx_authn_scram_http_test_server:set_handler(Handler).
|
||||
ok = emqx_authn_scram_restapi_test_server:set_handler(Handler).
|
||||
|
||||
init_auth() ->
|
||||
init_auth(raw_config()).
|
||||
|
@ -413,7 +409,7 @@ init_auth(Config) ->
|
|||
{ok, [#{state := State}]} = emqx_authn_chains:list_authenticators(?GLOBAL),
|
||||
State.
|
||||
|
||||
make_user_info(Password, Algorithm, IterationCount, IsSuperuser) ->
|
||||
make_user_info(Password, Algorithm, IterationCount) ->
|
||||
{StoredKey, ServerKey, Salt} = esasl_scram:generate_authentication_info(
|
||||
Password,
|
||||
#{
|
||||
|
@ -424,8 +420,7 @@ make_user_info(Password, Algorithm, IterationCount, IsSuperuser) ->
|
|||
#{
|
||||
stored_key => binary:encode_hex(StoredKey),
|
||||
server_key => binary:encode_hex(ServerKey),
|
||||
salt => binary:encode_hex(Salt),
|
||||
is_superuser => IsSuperuser
|
||||
salt => binary:encode_hex(Salt)
|
||||
}.
|
||||
|
||||
receive_packet() ->
|
||||
|
@ -436,3 +431,79 @@ receive_packet() ->
|
|||
after 1000 ->
|
||||
ct:fail("Deliver timeout")
|
||||
end.
|
||||
|
||||
create_connection(Username, Password) ->
|
||||
{ok, Pid} = emqx_authn_mqtt_test_client:start_link("127.0.0.1", 1883),
|
||||
|
||||
ClientFirstMessage = esasl_scram:client_first_message(Username),
|
||||
|
||||
ConnectPacket = ?CONNECT_PACKET(
|
||||
#mqtt_packet_connect{
|
||||
proto_ver = ?MQTT_PROTO_V5,
|
||||
properties = #{
|
||||
'Authentication-Method' => <<"SCRAM-SHA-512">>,
|
||||
'Authentication-Data' => ClientFirstMessage
|
||||
}
|
||||
}
|
||||
),
|
||||
|
||||
ok = emqx_authn_mqtt_test_client:send(Pid, ConnectPacket),
|
||||
|
||||
%% Intentional sleep to trigger idle timeout for the connection not yet authenticated
|
||||
ok = ct:sleep(1000),
|
||||
|
||||
?AUTH_PACKET(
|
||||
?RC_CONTINUE_AUTHENTICATION,
|
||||
#{'Authentication-Data' := ServerFirstMessage}
|
||||
) = receive_packet(),
|
||||
|
||||
{continue, ClientFinalMessage, ClientCache} =
|
||||
esasl_scram:check_server_first_message(
|
||||
ServerFirstMessage,
|
||||
#{
|
||||
client_first_message => ClientFirstMessage,
|
||||
password => Password,
|
||||
algorithm => ?ALGORITHM
|
||||
}
|
||||
),
|
||||
|
||||
AuthContinuePacket = ?AUTH_PACKET(
|
||||
?RC_CONTINUE_AUTHENTICATION,
|
||||
#{
|
||||
'Authentication-Method' => <<"SCRAM-SHA-512">>,
|
||||
'Authentication-Data' => ClientFinalMessage
|
||||
}
|
||||
),
|
||||
|
||||
ok = emqx_authn_mqtt_test_client:send(Pid, AuthContinuePacket),
|
||||
|
||||
?CONNACK_PACKET(
|
||||
?RC_SUCCESS,
|
||||
_,
|
||||
#{'Authentication-Data' := ServerFinalMessage}
|
||||
) = receive_packet(),
|
||||
|
||||
ok = esasl_scram:check_server_final_message(
|
||||
ServerFinalMessage, ClientCache#{algorithm => ?ALGORITHM}
|
||||
),
|
||||
{ok, Pid}.
|
||||
|
||||
test_acl({allow, Topic}, C) ->
|
||||
?assertMatch(
|
||||
[0],
|
||||
send_subscribe(C, Topic)
|
||||
);
|
||||
test_acl({deny, Topic}, C) ->
|
||||
?assertMatch(
|
||||
[?RC_NOT_AUTHORIZED],
|
||||
send_subscribe(C, Topic)
|
||||
).
|
||||
|
||||
send_subscribe(Client, Topic) ->
|
||||
TopicOpts = #{nl => 0, rap => 0, rh => 0, qos => 0},
|
||||
Packet = ?SUBSCRIBE_PACKET(1, [{Topic, TopicOpts}]),
|
||||
emqx_authn_mqtt_test_client:send(Client, Packet),
|
||||
timer:sleep(200),
|
||||
|
||||
?SUBACK_PACKET(1, ReasonCode) = receive_packet(),
|
||||
ReasonCode.
|
|
@ -2,7 +2,7 @@
|
|||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_authn_scram_http_test_server).
|
||||
-module(emqx_authn_scram_restapi_test_server).
|
||||
|
||||
-behaviour(supervisor).
|
||||
-behaviour(cowboy_handler).
|
|
@ -1,7 +1,7 @@
|
|||
%% -*- mode: erlang -*-
|
||||
{application, emqx_auth_jwt, [
|
||||
{description, "EMQX JWT Authentication and Authorization"},
|
||||
{vsn, "0.3.2"},
|
||||
{vsn, "0.3.3"},
|
||||
{registered, []},
|
||||
{mod, {emqx_auth_jwt_app, []}},
|
||||
{applications, [
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
-include_lib("emqx_auth/include/emqx_authn.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
|
||||
-define(LDAP_HOST, "ldap").
|
||||
-define(LDAP_DEFAULT_PORT, 389).
|
||||
|
@ -46,13 +47,6 @@ init_per_suite(Config) ->
|
|||
Apps = emqx_cth_suite:start([emqx, emqx_conf, emqx_auth, emqx_auth_ldap], #{
|
||||
work_dir => ?config(priv_dir, Config)
|
||||
}),
|
||||
{ok, _} = emqx_resource:create_local(
|
||||
?LDAP_RESOURCE,
|
||||
?AUTHN_RESOURCE_GROUP,
|
||||
emqx_ldap,
|
||||
ldap_config(),
|
||||
#{}
|
||||
),
|
||||
[{apps, Apps} | Config];
|
||||
false ->
|
||||
{skip, no_ldap}
|
||||
|
@ -63,7 +57,6 @@ end_per_suite(Config) ->
|
|||
[authentication],
|
||||
?GLOBAL
|
||||
),
|
||||
ok = emqx_resource:remove_local(?LDAP_RESOURCE),
|
||||
ok = emqx_cth_suite:stop(?config(apps, Config)).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -128,6 +121,87 @@ t_create_invalid(_Config) ->
|
|||
InvalidConfigs
|
||||
).
|
||||
|
||||
t_authenticate_timeout_cause_reconnect(_Config) ->
|
||||
TestPid = self(),
|
||||
meck:new(eldap, [non_strict, no_link, passthrough]),
|
||||
try
|
||||
%% cause eldap process to be killed
|
||||
meck:expect(
|
||||
eldap,
|
||||
search,
|
||||
fun
|
||||
(Pid, [{base, <<"uid=mqttuser0007", _/binary>>} | _]) ->
|
||||
TestPid ! {eldap_pid, Pid},
|
||||
{error, {gen_tcp_error, timeout}};
|
||||
(Pid, Args) ->
|
||||
meck:passthrough([Pid, Args])
|
||||
end
|
||||
),
|
||||
|
||||
Credentials = fun(Username) ->
|
||||
#{
|
||||
username => Username,
|
||||
password => Username,
|
||||
listener => 'tcp:default',
|
||||
protocol => mqtt
|
||||
}
|
||||
end,
|
||||
|
||||
SpecificConfigParams = #{},
|
||||
Result = {ok, #{is_superuser => true}},
|
||||
|
||||
Timeout = 1000,
|
||||
Config0 = raw_ldap_auth_config(),
|
||||
Config = Config0#{
|
||||
<<"pool_size">> => 1,
|
||||
<<"request_timeout">> => Timeout
|
||||
},
|
||||
AuthConfig = maps:merge(Config, SpecificConfigParams),
|
||||
{ok, _} = emqx:update_config(
|
||||
?PATH,
|
||||
{create_authenticator, ?GLOBAL, AuthConfig}
|
||||
),
|
||||
|
||||
%% 0006 is a disabled user
|
||||
?assertEqual(
|
||||
{error, user_disabled},
|
||||
emqx_access_control:authenticate(Credentials(<<"mqttuser0006">>))
|
||||
),
|
||||
?assertEqual(
|
||||
{error, not_authorized},
|
||||
emqx_access_control:authenticate(Credentials(<<"mqttuser0007">>))
|
||||
),
|
||||
ok = wait_for_ldap_pid(1000),
|
||||
[#{id := ResourceID}] = emqx_resource_manager:list_all(),
|
||||
?retry(1_000, 10, {ok, connected} = emqx_resource_manager:health_check(ResourceID)),
|
||||
%% turn back to normal
|
||||
meck:expect(
|
||||
eldap,
|
||||
search,
|
||||
2,
|
||||
fun(Pid2, Query) ->
|
||||
meck:passthrough([Pid2, Query])
|
||||
end
|
||||
),
|
||||
%% expect eldap process to be restarted
|
||||
?assertEqual(Result, emqx_access_control:authenticate(Credentials(<<"mqttuser0007">>))),
|
||||
emqx_authn_test_lib:delete_authenticators(
|
||||
[authentication],
|
||||
?GLOBAL
|
||||
)
|
||||
after
|
||||
meck:unload(eldap)
|
||||
end.
|
||||
|
||||
wait_for_ldap_pid(After) ->
|
||||
receive
|
||||
{eldap_pid, Pid} ->
|
||||
?assertNot(is_process_alive(Pid)),
|
||||
ok
|
||||
after After ->
|
||||
error(timeout)
|
||||
end.
|
||||
|
||||
t_authenticate(_Config) ->
|
||||
ok = lists:foreach(
|
||||
fun(Sample) ->
|
||||
|
@ -300,6 +374,3 @@ user_seeds() ->
|
|||
|
||||
ldap_server() ->
|
||||
iolist_to_binary(io_lib:format("~s:~B", [?LDAP_HOST, ?LDAP_DEFAULT_PORT])).
|
||||
|
||||
ldap_config() ->
|
||||
emqx_ldap_SUITE:ldap_config([]).
|
||||
|
|
|
@ -44,7 +44,6 @@ init_per_suite(Config) ->
|
|||
],
|
||||
#{work_dir => emqx_cth_suite:work_dir(Config)}
|
||||
),
|
||||
ok = create_ldap_resource(),
|
||||
[{apps, Apps} | Config];
|
||||
false ->
|
||||
{skip, no_ldap}
|
||||
|
@ -167,21 +166,8 @@ setup_config(SpecialParams) ->
|
|||
ldap_server() ->
|
||||
iolist_to_binary(io_lib:format("~s:~B", [?LDAP_HOST, ?LDAP_DEFAULT_PORT])).
|
||||
|
||||
ldap_config() ->
|
||||
emqx_ldap_SUITE:ldap_config([]).
|
||||
|
||||
start_apps(Apps) ->
|
||||
lists:foreach(fun application:ensure_all_started/1, Apps).
|
||||
|
||||
stop_apps(Apps) ->
|
||||
lists:foreach(fun application:stop/1, Apps).
|
||||
|
||||
create_ldap_resource() ->
|
||||
{ok, _} = emqx_resource:create_local(
|
||||
?LDAP_RESOURCE,
|
||||
?AUTHZ_RESOURCE_GROUP,
|
||||
emqx_ldap,
|
||||
ldap_config(),
|
||||
#{}
|
||||
),
|
||||
ok.
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
%% -*- mode: erlang -*-
|
||||
{application, emqx_auth_mnesia, [
|
||||
{description, "EMQX Buitl-in Database Authentication and Authorization"},
|
||||
{vsn, "0.1.6"},
|
||||
{vsn, "0.1.7"},
|
||||
{registered, []},
|
||||
{mod, {emqx_auth_mnesia_app, []}},
|
||||
{applications, [
|
||||
|
|
|
@ -141,7 +141,9 @@ authenticate(
|
|||
reason => Reason
|
||||
})
|
||||
end,
|
||||
emqx_utils_scram:authenticate(AuthMethod, AuthData, AuthCache, RetrieveFun, OnErrFun, State);
|
||||
emqx_utils_scram:authenticate(
|
||||
AuthMethod, AuthData, AuthCache, State, RetrieveFun, OnErrFun, [is_superuser]
|
||||
);
|
||||
authenticate(_Credential, _State) ->
|
||||
ignore.
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
%% -*- mode: erlang -*-
|
||||
{application, emqx_auth_mongodb, [
|
||||
{description, "EMQX MongoDB Authentication and Authorization"},
|
||||
{vsn, "0.2.1"},
|
||||
{vsn, "0.2.2"},
|
||||
{registered, []},
|
||||
{mod, {emqx_auth_mongodb_app, []}},
|
||||
{applications, [
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
%% -*- mode: erlang -*-
|
||||
{application, emqx_auth_mysql, [
|
||||
{description, "EMQX MySQL Authentication and Authorization"},
|
||||
{vsn, "0.2.1"},
|
||||
{vsn, "0.2.2"},
|
||||
{registered, []},
|
||||
{mod, {emqx_auth_mysql_app, []}},
|
||||
{applications, [
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
%% -*- mode: erlang -*-
|
||||
{application, emqx_auth_postgresql, [
|
||||
{description, "EMQX PostgreSQL Authentication and Authorization"},
|
||||
{vsn, "0.2.1"},
|
||||
{vsn, "0.2.2"},
|
||||
{registered, []},
|
||||
{mod, {emqx_auth_postgresql_app, []}},
|
||||
{applications, [
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
%% -*- mode: erlang -*-
|
||||
{application, emqx_auth_redis, [
|
||||
{description, "EMQX Redis Authentication and Authorization"},
|
||||
{vsn, "0.2.1"},
|
||||
{vsn, "0.2.2"},
|
||||
{registered, []},
|
||||
{mod, {emqx_auth_redis_app, []}},
|
||||
{applications, [
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
%% -*- mode: erlang -*-
|
||||
{application, emqx_bridge, [
|
||||
{description, "EMQX bridges"},
|
||||
{vsn, "0.2.3"},
|
||||
{vsn, "0.2.4"},
|
||||
{registered, [emqx_bridge_sup]},
|
||||
{mod, {emqx_bridge_app, []}},
|
||||
{applications, [
|
||||
|
|
|
@ -123,6 +123,7 @@ common_bridge_fields() ->
|
|||
boolean(),
|
||||
#{
|
||||
desc => ?DESC("desc_enable"),
|
||||
importance => ?IMPORTANCE_NO_DOC,
|
||||
default => true
|
||||
}
|
||||
)},
|
||||
|
|
|
@ -65,6 +65,7 @@
|
|||
-export([
|
||||
make_producer_action_schema/1, make_producer_action_schema/2,
|
||||
make_consumer_action_schema/1, make_consumer_action_schema/2,
|
||||
common_fields/0,
|
||||
top_level_common_action_keys/0,
|
||||
top_level_common_source_keys/0,
|
||||
project_to_actions_resource_opts/1,
|
||||
|
@ -507,16 +508,26 @@ make_consumer_action_schema(ParametersRef, Opts) ->
|
|||
})}
|
||||
].
|
||||
|
||||
common_schema(ParametersRef, _Opts) ->
|
||||
common_fields() ->
|
||||
[
|
||||
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
|
||||
{enable,
|
||||
mk(boolean(), #{
|
||||
desc => ?DESC("config_enable"),
|
||||
importance => ?IMPORTANCE_NO_DOC,
|
||||
default => true
|
||||
})},
|
||||
{connector,
|
||||
mk(binary(), #{
|
||||
desc => ?DESC(emqx_connector_schema, "connector_field"), required => true
|
||||
})},
|
||||
{tags, emqx_schema:tags_schema()},
|
||||
{description, emqx_schema:description_schema()},
|
||||
{description, emqx_schema:description_schema()}
|
||||
].
|
||||
|
||||
common_schema(ParametersRef, _Opts) ->
|
||||
[
|
||||
{parameters, ParametersRef}
|
||||
| common_fields()
|
||||
].
|
||||
|
||||
project_to_actions_resource_opts(OldResourceOpts) ->
|
||||
|
|
|
@ -1154,7 +1154,7 @@ t_bridges_probe(Config) ->
|
|||
?assertMatch(
|
||||
{ok, 400, #{
|
||||
<<"code">> := <<"TEST_FAILED">>,
|
||||
<<"message">> := <<"Connection refused">>
|
||||
<<"message">> := <<"Connection refused", _/binary>>
|
||||
}},
|
||||
request_json(
|
||||
post,
|
||||
|
|
|
@ -889,7 +889,8 @@ t_sync_query_down(Config, Opts) ->
|
|||
),
|
||||
|
||||
?force_ordering(
|
||||
#{?snk_kind := call_query},
|
||||
#{?snk_kind := SNKKind} when
|
||||
SNKKind =:= call_query orelse SNKKind =:= simple_query_enter,
|
||||
#{?snk_kind := cut_connection, ?snk_span := start}
|
||||
),
|
||||
%% Note: order of arguments here is reversed compared to `?force_ordering'.
|
||||
|
@ -913,6 +914,7 @@ t_sync_query_down(Config, Opts) ->
|
|||
emqx_common_test_helpers:enable_failure(down, ProxyName, ProxyHost, ProxyPort)
|
||||
)
|
||||
end),
|
||||
?tp("publishing_message", #{}),
|
||||
try
|
||||
{_, {ok, _}} =
|
||||
snabbkaffe:wait_async_action(
|
||||
|
@ -921,6 +923,7 @@ t_sync_query_down(Config, Opts) ->
|
|||
infinity
|
||||
)
|
||||
after
|
||||
?tp("healing_failure", #{}),
|
||||
emqx_common_test_helpers:heal_failure(down, ProxyName, ProxyHost, ProxyPort)
|
||||
end,
|
||||
{ok, _} = snabbkaffe:block_until(SuccessTPFilter, infinity),
|
||||
|
|
|
@ -23,7 +23,7 @@ defmodule EMQXBridgeAzureEventHub.MixProject do
|
|||
|
||||
def deps() do
|
||||
[
|
||||
{:wolff, github: "kafka4beam/wolff", tag: "2.0.0"},
|
||||
{:wolff, github: "kafka4beam/wolff", tag: "3.0.2"},
|
||||
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true},
|
||||
{:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},
|
||||
{:brod, github: "kafka4beam/brod", tag: "3.18.0"},
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
|
||||
{erl_opts, [debug_info]}.
|
||||
{deps, [
|
||||
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "2.0.0"}}},
|
||||
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "3.0.2"}}},
|
||||
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
|
||||
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
|
||||
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_bridge_azure_event_hub, [
|
||||
{description, "EMQX Enterprise Azure Event Hub Bridge"},
|
||||
{vsn, "0.1.7"},
|
||||
{vsn, "0.1.8"},
|
||||
{registered, []},
|
||||
{applications, [
|
||||
kernel,
|
||||
|
|
|
@ -129,16 +129,7 @@ fields(actions) ->
|
|||
override(
|
||||
emqx_bridge_kafka:producer_opts(action),
|
||||
bridge_v2_overrides()
|
||||
) ++
|
||||
[
|
||||
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
|
||||
{connector,
|
||||
mk(binary(), #{
|
||||
desc => ?DESC(emqx_connector_schema, "connector_field"), required => true
|
||||
})},
|
||||
{tags, emqx_schema:tags_schema()},
|
||||
{description, emqx_schema:description_schema()}
|
||||
],
|
||||
) ++ emqx_bridge_v2_schema:common_fields(),
|
||||
override_documentations(Fields);
|
||||
fields(Method) ->
|
||||
Fields = emqx_bridge_kafka:fields(Method),
|
||||
|
|
|
@ -382,7 +382,26 @@ t_multiple_actions_sharing_topic(Config) ->
|
|||
ActionConfig0,
|
||||
#{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}}
|
||||
),
|
||||
ok = emqx_bridge_v2_kafka_producer_SUITE:t_multiple_actions_sharing_topic(
|
||||
ok =
|
||||
emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME(
|
||||
[
|
||||
{type, ?BRIDGE_TYPE_BIN},
|
||||
{connector_name, ?config(connector_name, Config)},
|
||||
{connector_config, ?config(connector_config, Config)},
|
||||
{action_config, ActionConfig}
|
||||
]
|
||||
),
|
||||
ok.
|
||||
|
||||
t_dynamic_topics(Config) ->
|
||||
ActionConfig0 = ?config(action_config, Config),
|
||||
ActionConfig =
|
||||
emqx_utils_maps:deep_merge(
|
||||
ActionConfig0,
|
||||
#{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}}
|
||||
),
|
||||
ok =
|
||||
emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME(
|
||||
[
|
||||
{type, ?BRIDGE_TYPE_BIN},
|
||||
{connector_name, ?config(connector_name, Config)},
|
||||
|
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue