Compare commits

...

215 Commits

Author SHA1 Message Date
Ivan Dyachkov bcd63344b8
Merge pull request #13583 from id/20240807-sync-release-branches
sync release branches
2024-08-07 11:38:14 +02:00
Ivan Dyachkov cc3b26a3ac Merge remote-tracking branch 'upstream/release-58' into 20240807-sync-release-branches 2024-08-07 09:48:38 +02:00
Ivan Dyachkov dd686c24a0 Merge remote-tracking branch 'upstream/release-57' into 20240807-sync-release-branches 2024-08-07 09:44:38 +02:00
Ivan Dyachkov 592c4e0045
Merge pull request #12774 from emqx/dependabot/github_actions/dot-github/actions/package-macos/actions-package-macos-83d1e47aa6
chore(deps): bump the actions-package-macos group in /.github/actions/package-macos with 1 update
2024-08-07 09:43:53 +02:00
Ivan Dyachkov 073e3ea0a8
Merge pull request #13569 from emqx/dependabot/github_actions/actions-ef71aea555
chore(deps): bump the actions group across 1 directory with 8 updates
2024-08-07 09:35:52 +02:00
Xinyu Liu 81978ceaeb
Merge pull request #13571 from terry-xiaoyu/fast_fail_on_invalid_ssl_opts
chore: update esockd to 5.12.0
2024-08-07 11:21:32 +08:00
Ilia Averianov 6bfddd9952
Merge pull request #13565 from savonarola/0801-shared-subs-compact-structures
Reduce size of shared sub protocol structures
2024-08-06 19:56:08 +03:00
Thales Macedo Garitezi cf608a73a5
Merge pull request #13578 from thalesmg/20240806-r58-port-raft-precond
feat(dsraft): support atomic batches + preconditions (release-58)
2024-08-06 13:40:46 -03:00
Thales Macedo Garitezi a8200fb83d
Merge pull request #13579 from thalesmg/20240806-r58-test-flaky-consumer-rebalance
test: attempt to reduce test flakiness
2024-08-06 13:33:46 -03:00
Ilya Averyanov 9ad65c6ac1 feat(queue): reduce logging levels 2024-08-06 18:45:15 +03:00
Thales Macedo Garitezi 9ca3985bbd test: attempt to reduce test flakiness 2024-08-06 12:44:51 -03:00
Ilya Averyanov e17becb84d feat(queue): compact protocol structures, organize formatting 2024-08-06 18:05:02 +03:00
Andrew Mayorov 5dd8fefded test(ds): avoid side effects in check phase 2024-08-06 11:43:12 -03:00
Andrew Mayorov 7b85faf12a chore(dsraft): fix few spelling errors
Co-Authored-By: Thales Macedo Garitezi <thalesmg@gmail.com>
2024-08-06 11:43:12 -03:00
Andrew Mayorov b0594271b2 chore(dsraft): fix a typespec 2024-08-06 11:43:12 -03:00
Andrew Mayorov d8aa39a310 fix(dsraft): use local application environment 2024-08-06 11:43:12 -03:00
Andrew Mayorov fc0434afc8 chore(dslocal): refine few typespecs 2024-08-06 11:43:12 -03:00
Andrew Mayorov 5502af18b7 feat(ds): support deletions + precondition-related API in bitfield-lts 2024-08-06 11:43:12 -03:00
Andrew Mayorov 9f96e0957e test(ds): verify deletions work predictably 2024-08-06 11:43:12 -03:00
Andrew Mayorov 109ffe7a70 fix(dsbackend): unify timestamp resolution in operations / preconditions 2024-08-06 11:43:12 -03:00
Andrew Mayorov 1559aac486 test(dsbackend): add shared tests for atomic batches + preconditions 2024-08-06 11:43:12 -03:00
Andrew Mayorov 68990f1538 feat(ds): support operations + preconditions in skipstream-lts 2024-08-06 11:43:12 -03:00
Andrew Mayorov 5356d678cc feat(dsraft): support atomic batches + preconditions 2024-08-06 11:43:12 -03:00
Andrew Mayorov 11951f8f6c feat(ds): adopt buffer interface to `emqx_ds:operation()` 2024-08-06 11:43:12 -03:00
Andrew Mayorov 0aa4cdbaf3 feat(ds): add generic preconditions implementation 2024-08-06 11:43:12 -03:00
Ivan Dyachkov 281f8ddc83
Merge pull request #13575 from Kinplemelon/kinple/upgrade-dashboard-58
chore(dashboard): bump dashboard version to v1.10.0-beta.1 & e1.8.0-beta.1
2024-08-06 16:39:06 +02:00
Kinplemelon b80513e941 ci: update emqx docs link in dashboard 2024-08-06 15:21:19 +02:00
Ivan Dyachkov 822ed71282 chore: release 5.7.2 2024-08-06 13:25:56 +02:00
Kinple b8fd5de2a5
Merge pull request #13577 from Kinplemelon/kinple/upgrade-dashboard
chore(dashboard): bump dashboard version to v1.9.2 & e1.7.2
2024-08-06 19:02:50 +08:00
Kinplemelon 3ee84d60ae chore(dashboard): bump dashboard version to v1.9.2 & e1.7.2 2024-08-06 18:11:35 +08:00
Andrew Mayorov 3b52b658cd
Merge pull request #13559 from keynslug/feat/EMQX-12309/raft-precond
feat(dsraft): support atomic batches + preconditions
2024-08-06 09:17:16 +02:00
Kinplemelon cba3dcbeda chore(dashboard): bump dashboard version to v1.10.0-beta.1 & e1.8.0-beta.1 2024-08-06 13:44:16 +08:00
Kinple caf1897979
Merge pull request #13574 from Kinplemelon/kinple/upgrade-dashboard
chore(dashboard): bump dashboard version to e1.7.2-beta.7
2024-08-06 10:51:03 +08:00
Kinplemelon dbbd5e1458 ci: update emqx docs link in dashboard 2024-08-06 09:33:20 +08:00
Kinplemelon 0ab31df9d2 chore(dashboard): bump dashboard version to v1.9.2-beta.1 & e1.7.2-beta.7 2024-08-06 09:32:17 +08:00
Thales Macedo Garitezi 613fc644f5
Merge pull request #13425 from kjellwinblad/kjell/review_connector_error_logs_mqtt_etc/EMQX-12555/EMQX-12657
fix: make MQTT connector error log messages easier to understand
2024-08-05 17:34:13 -03:00
Andrew Mayorov b1a53568d6
test(ds): avoid side effects in check phase 2024-08-05 16:34:17 +02:00
Ivan Dyachkov d6651a1889
Merge pull request #13572 from id/20240805-prep-5.8.0-alpha.1
chore: prepare 5.8.0-alpha.1
2024-08-05 16:05:42 +02:00
Ivan Dyachkov 4cf7151139 chore: prepare 5.8.0-alpha.1 2024-08-05 11:09:07 +02:00
Ivan Dyachkov 4865999606 Merge remote-tracking branch 'upstream/master' into release-58 2024-08-05 10:59:59 +02:00
Andrew Mayorov 382feab7d1
chore(dsraft): fix few spelling errors
Co-Authored-By: Thales Macedo Garitezi <thalesmg@gmail.com>
2024-08-05 10:55:49 +02:00
Andrew Mayorov 6aad774075
chore(dsraft): fix a typespec 2024-08-05 10:55:49 +02:00
Andrew Mayorov 649cbf1c79
fix(dsraft): use local application environment 2024-08-05 10:55:49 +02:00
Andrew Mayorov 4cde5e98a3
chore(dslocal): refine few typespecs 2024-08-05 10:55:48 +02:00
Andrew Mayorov d631b5b296
feat(ds): support deletions + precondition-related API in bitfield-lts 2024-08-05 10:55:48 +02:00
Andrew Mayorov 26ec69d5f4
test(ds): verify deletions work predictably 2024-08-05 10:55:48 +02:00
Andrew Mayorov 58b9ab0210
fix(dsbackend): unify timestamp resolution in operations / preconditions 2024-08-05 10:55:22 +02:00
lafirest 4644072fd8
Merge pull request #13570 from lafirest/fix/api_key_bootstrap
fix(api_key): do not crash boot when the bootstrap file is not exists
2024-08-05 16:33:43 +08:00
Shawn bd87e3ce2b chore: update esockd to 5.12.0 2024-08-05 16:18:04 +08:00
firest c9c4d1a196 fix(api_key): do not crash boot when the bootstrap file is not exists 2024-08-05 15:56:05 +08:00
dependabot[bot] 11546b72f4
chore(deps): bump the actions group across 1 directory with 8 updates
Bumps the actions group with 8 updates in the / directory:

| Package | From | To |
| --- | --- | --- |
| [actions/checkout](https://github.com/actions/checkout) | `4.1.2` | `4.1.7` |
| [actions/upload-artifact](https://github.com/actions/upload-artifact) | `4.3.3` | `4.3.5` |
| [actions/download-artifact](https://github.com/actions/download-artifact) | `4.1.7` | `4.1.8` |
| [docker/setup-qemu-action](https://github.com/docker/setup-qemu-action) | `3.0.0` | `3.2.0` |
| [docker/setup-buildx-action](https://github.com/docker/setup-buildx-action) | `3.3.0` | `3.6.1` |
| [docker/login-action](https://github.com/docker/login-action) | `3.2.0` | `3.3.0` |
| [erlef/setup-beam](https://github.com/erlef/setup-beam) | `1.18.0` | `1.18.1` |
| [ossf/scorecard-action](https://github.com/ossf/scorecard-action) | `2.3.3` | `2.4.0` |



Updates `actions/checkout` from 4.1.2 to 4.1.7
- [Release notes](https://github.com/actions/checkout/releases)
- [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md)
- [Commits](https://github.com/actions/checkout/compare/v4.1.2...692973e3d937129bcbf40652eb9f2f61becf3332)

Updates `actions/upload-artifact` from 4.3.3 to 4.3.5
- [Release notes](https://github.com/actions/upload-artifact/releases)
- [Commits](65462800fd...89ef406dd8)

Updates `actions/download-artifact` from 4.1.7 to 4.1.8
- [Release notes](https://github.com/actions/download-artifact/releases)
- [Commits](65a9edc588...fa0a91b85d)

Updates `docker/setup-qemu-action` from 3.0.0 to 3.2.0
- [Release notes](https://github.com/docker/setup-qemu-action/releases)
- [Commits](68827325e0...49b3bc8e6b)

Updates `docker/setup-buildx-action` from 3.3.0 to 3.6.1
- [Release notes](https://github.com/docker/setup-buildx-action/releases)
- [Commits](d70bba72b1...988b5a0280)

Updates `docker/login-action` from 3.2.0 to 3.3.0
- [Release notes](https://github.com/docker/login-action/releases)
- [Commits](0d4c9c5ea7...9780b0c442)

Updates `erlef/setup-beam` from 1.18.0 to 1.18.1
- [Release notes](https://github.com/erlef/setup-beam/releases)
- [Commits](a6e26b2231...b9c58b0450)

Updates `ossf/scorecard-action` from 2.3.3 to 2.4.0
- [Release notes](https://github.com/ossf/scorecard-action/releases)
- [Changelog](https://github.com/ossf/scorecard-action/blob/main/RELEASE.md)
- [Commits](dc50aa9510...62b2cac7ed)

---
updated-dependencies:
- dependency-name: actions/checkout
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: actions
- dependency-name: actions/upload-artifact
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: actions
- dependency-name: actions/download-artifact
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: actions
- dependency-name: docker/setup-qemu-action
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: actions
- dependency-name: docker/setup-buildx-action
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: actions
- dependency-name: docker/login-action
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: actions
- dependency-name: erlef/setup-beam
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: actions
- dependency-name: ossf/scorecard-action
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: actions
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-08-05 03:25:47 +00:00
dependabot[bot] bcb70a9fb9
chore(deps): bump the actions-package-macos group
Bumps the actions-package-macos group in /.github/actions/package-macos with 1 update: [actions/cache](https://github.com/actions/cache).


Updates `actions/cache` from 4.0.1 to 4.0.2
- [Release notes](https://github.com/actions/cache/releases)
- [Changelog](https://github.com/actions/cache/blob/main/RELEASES.md)
- [Commits](ab5e6d0c87...0c45773b62)

---
updated-dependencies:
- dependency-name: actions/cache
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: actions-package-macos
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-08-05 03:17:26 +00:00
JimMoen 09ec31908b
Merge pull request #13357 from JimMoen/fix-utf8-frame-error-connack
Stop returning `CONNACK` or `DISCONNECT` to clients that sent malformed CONNECT packets.

- Only send `CONNACK` with reason code `frame_too_large` for MQTT-v5.0 when connecting if the protocol version field in CONNECT can be detected.
- Otherwise **DONOT** send any CONNACK or DISCONNECT packet.
2024-08-02 15:24:30 +08:00
lafirest b94ec4014f
Merge pull request #13563 from lafirest/fix/payload_encode
fix(log): respect payload encoding settings when formatting packets
2024-08-02 14:38:12 +08:00
firest 74c346f9d1 fix(log): respect payload encoding settings when formatting packets 2024-08-02 12:41:30 +08:00
zhongwencool 8a33ef8576
Merge pull request #13562 from zhongwencool/fix-deactivate-alarm
fix: deactivate alarm before create resource
2024-08-02 12:08:27 +08:00
zhongwencool 6c2033ecbf fix: deactivate alarm before create resource 2024-08-02 11:03:59 +08:00
zmstone 51530588ef ci: fix a typo in commented out docker-compose yaml file 2024-08-01 22:41:42 +02:00
Thales Macedo Garitezi bba9d085d6 test: refactor test structure 2024-08-01 16:03:04 -03:00
Thales Macedo Garitezi 3162fe7a27 feat: prettify some error explanations 2024-08-01 15:31:00 -03:00
Thales Macedo Garitezi 52b2d73b28 test: move new test to newer module and use current apis 2024-08-01 15:13:25 -03:00
Thales Macedo Garitezi 44e7f2e9b2 refactor: use macros for status to avoid typos 2024-08-01 14:49:43 -03:00
Thales Macedo Garitezi baf2b96cbc test: refactor test structure 2024-08-01 14:27:25 -03:00
Kjell Winblad ba2d4f3df3 docs: add change log entry 2024-08-01 14:21:27 -03:00
Kjell Winblad 11aaa7b07d fix: make MQTT connector error log messages easier to understand
Fixes:
https://emqx.atlassian.net/browse/EMQX-12555
https://emqx.atlassian.net/browse/EMQX-12657
2024-08-01 14:21:26 -03:00
Thales Macedo Garitezi 4250d01363
Merge pull request #13546 from thalesmg/20240730-r58-pulsar-query-mode
feat: expose `resource_opts.query_mode` for pulsar action
2024-08-01 14:19:16 -03:00
Thales Macedo Garitezi 86853ac6ef
Merge pull request #13545 from thalesmg/20240730-m-connector-jwt-app
refactor: move JWT worker and helpers to separate app
2024-08-01 13:06:27 -03:00
Andrew Mayorov 810a4d3cf9
test(dsbackend): add shared tests for atomic batches + preconditions 2024-08-01 14:26:45 +02:00
Andrew Mayorov 7b243ef7ad
feat(ds): support operations + preconditions in skipstream-lts 2024-08-01 14:26:45 +02:00
Andrew Mayorov fcf76d28ba
feat(dsraft): support atomic batches + preconditions 2024-08-01 14:26:45 +02:00
Andrew Mayorov 3b5d98c1d9
feat(ds): adopt buffer interface to `emqx_ds:operation()` 2024-08-01 14:26:45 +02:00
Andrew Mayorov 451b03ff99
feat(ds): add generic preconditions implementation 2024-08-01 14:26:45 +02:00
JimMoen f792418a68
Merge pull request #13552 from JimMoen/fix-plugin-app-takes-too-long
fix: add a startup timeout limit for the plugin application
2024-08-01 16:46:09 +08:00
JimMoen 4915cc0da6
chore: add changelog entry for 13357 2024-08-01 15:23:58 +08:00
JimMoen 15b3f4deb0
fix: rm unused func and exports 2024-08-01 15:00:24 +08:00
JimMoen 7a251c9ead
test: handle frame error for CONNECT packets 2024-08-01 10:26:31 +08:00
JimMoen 37a89d0094
fix: enrich parse_state and connection serialize opts 2024-08-01 10:26:31 +08:00
JimMoen c313aa89f0
fix: try throw proto_ver and proto_name when parsing CONNECT packet 2024-08-01 10:26:31 +08:00
JimMoen 6db1c0a446
refactor: separate function to handle `frame_error` 2024-08-01 10:26:31 +08:00
JimMoen d4508a4f1d
chore: sync master `elvis.config` 2024-08-01 10:26:31 +08:00
Thales Macedo Garitezi a6a9538e73 refactor: move JWT worker and helpers to separate app
Some bridge applications might need to use JWTs before the `emqx_connector` is started, so
we must move JWT table initialization to a separate dependency application.
2024-07-31 14:52:12 -03:00
Thales Macedo Garitezi 9f97bff7d0 feat: expose `resource_opts.query_mode` for pulsar action
Fixes https://emqx.atlassian.net/browse/EMQX-12782
2024-07-31 11:13:11 -03:00
Ivan Dyachkov 577f1a7d8a
Merge pull request #13553 from id/20240731-ci-fix-docker-build
ci: fix docker images build
2024-07-31 16:04:47 +02:00
Ivan Dyachkov e42021d314
Merge pull request #13554 from id/20240731-sync-release-57
sync release-57
2024-07-31 15:48:37 +02:00
Thales Macedo Garitezi 08c58cc319
Merge pull request #13543 from thalesmg/20240730-r57-sr-delete-protobuf-cache
fix(schema registry): clear protobuf code cache when deleting/updating serdes
2024-07-31 10:16:48 -03:00
Thales Macedo Garitezi 150fee87f1
Merge pull request #13541 from thalesmg/20240730-r57-unset-crl-check-listener
fix(crl): force remove CRL fields from SSL opts after listener update
2024-07-31 10:16:35 -03:00
ieQu1 6058b50c91
Merge pull request #13555 from ieQu1/ds-rest-404
fix(mgmt): Return 404 for /ds/ API endpoints when DS is disabled
2024-07-31 14:57:17 +02:00
Thales Macedo Garitezi 85cff5e7eb fix: merge conflicts 2024-07-31 09:14:29 -03:00
ieQu1 569f48f5a1
fix(mgmt): Return 404 for /ds/ API endpoints when DS is disabled 2024-07-31 13:44:38 +02:00
ieQu1 2cf86e76ee
Merge pull request #13551 from ieQu1/EMQX-12587
fix(sessds): Expose durable sessions in the config API
2024-07-31 12:00:26 +02:00
Ivan Dyachkov 74cef7937d Merge remote-tracking branch 'upstream/release-57' into 20240731-sync-release-57 2024-07-31 11:31:29 +02:00
JimMoen c658cfe269
fix: make static_check happy 2024-07-31 17:17:13 +08:00
JimMoen a246551914
fix: add a startup timeout limit for the plugin application 2024-07-31 17:17:11 +08:00
JimMoen b1c8bc2421
Merge pull request #13548 from JimMoen/feat-plugin-on-config-changed-callback
feat: call plugin's app module `on_config_changed/2` callback
2024-07-31 16:40:48 +08:00
ieQu1 200b5ab294
Merge pull request #13550 from ieQu1/no-ra-dependency-on-oss
chore(emqx): Remove ra from the list of EMQX dependencies
2024-07-31 10:30:44 +02:00
Ivan Dyachkov 8d8ff6cf5d ci: fix docker images build
/etc/docker/daemon.json requires root for read access
2024-07-31 10:27:04 +02:00
ieQu1 a23b8266b1
fix(sessds): Expose durable sessions in the config API 2024-07-31 10:18:38 +02:00
ieQu1 d69342a2fc
chore(emqx): Remove ra from the list of EMQX dependencies 2024-07-31 09:56:28 +02:00
JimMoen e6bfc14cc9
fix: try-catch optional `on_config_changed/2` plugin app callback 2024-07-31 10:09:02 +08:00
JimMoen 3d1f0c756c
feat: call plugin's app module `on_config_changed/2` callback
assume the module: `[PluginName]_app`
2024-07-31 10:09:02 +08:00
Thales Macedo Garitezi 83041a8b83
Merge pull request #13544 from thalesmg/20240730-m-test-flaky-client-v2
test(clients v2 api): attempt to reduce flakiness
2024-07-30 16:07:37 -03:00
Thales Macedo Garitezi 1c4402b12c test(clients v2 api): attempt to reduce flakiness
https://github.com/emqx/emqx/actions/runs/10161391242/job/28101183920#step:6:331
2024-07-30 14:07:08 -03:00
Thales Macedo Garitezi ebb69f4ebf fix(crl): force remove crl fields from SSL opts after listener update
Fixes https://emqx.atlassian.net/browse/EMQX-12785
2024-07-30 14:00:24 -03:00
Thales Macedo Garitezi fd961f9da7 fix(schema registry): clear protobuf code cache when deleting/updating serde
Fixes https://emqx.atlassian.net/browse/EMQX-12789
2024-07-30 13:52:34 -03:00
Ilia Averianov 359bc38aa4
Merge pull request #13407 from savonarola/0701-shared-sub
Implement shared subscriptions
2024-07-30 16:12:13 +03:00
Ilya Averyanov 08f70e4a25 feat(queue): move ds shared sub dependent test to emqx_ds_shared_sub app 2024-07-30 14:19:39 +03:00
Ilya Averyanov e408804efb feat(queue): fix dialyzer issues 2024-07-30 13:01:48 +03:00
Ilya Averyanov e294d35703 feat(queue): add schema descriptions 2024-07-30 13:01:48 +03:00
Ilya Averyanov 303ff95e10 feat(queue): add stub for CRUD API 2024-07-30 13:01:48 +03:00
Ilya Averyanov 23f0e88b45 feat(queue): add integration with external broker 2024-07-30 13:01:46 +03:00
Ilya Averyanov f0dd1bc4f4 feat(queue): add shared sub support to the management API 2024-07-30 13:01:20 +03:00
Ilya Averyanov 9b30320ddb feat(queue): simplify progress report on disconnect 2024-07-30 13:01:20 +03:00
Ilya Averyanov cae27293a5 feat(queue): move route registration to sessions 2024-07-30 13:01:19 +03:00
Ilya Averyanov 81f4103d60 feat(queue): avoid cyclic dependencies 2024-07-30 13:01:19 +03:00
Ilya Averyanov bab526be24 feat(queue): self-revoke all shared streams on session open 2024-07-30 13:01:19 +03:00
Ilya Averyanov 9307a82004 feat(queue): rearrange leader's code 2024-07-30 13:01:19 +03:00
Ilya Averyanov b8e8f7c8e0 feat(queue): add pre_renew_streams callback 2024-07-30 13:01:18 +03:00
Ilya Averyanov a97a0d6400 feat(queue): fix dialyzer issues 2024-07-30 13:01:18 +03:00
Ilya Averyanov 8705956cdc feat(queue): update docs 2024-07-30 13:01:18 +03:00
Ilya Averyanov f213569460 feat(queue): clarify naming; identify shared subs by full topic filter 2024-07-30 13:01:18 +03:00
Ilya Averyanov 7e23f8d19f feat(queue): fix include 2024-07-30 13:01:17 +03:00
Ilya Averyanov a676ede6b8 feat(queue): improve logging 2024-07-30 13:01:17 +03:00
Ilya Averyanov 9e5e7a23c5 feat(queue): remove unnecessary acked flag 2024-07-30 13:01:17 +03:00
Ilya Averyanov 143086b0ef feat(queue): replace invalid rewing algorithm with skipping iterator 2024-07-30 13:01:16 +03:00
Ilya Averyanov c569625dd1 feat(queue): handle partially unacked ranges 2024-07-30 13:01:16 +03:00
Ilya Averyanov 7daab1ab23 feat(queue): move replay progress to a separate data structure 2024-07-30 13:01:16 +03:00
Ilya Averyanov 077ee38530 feat(queue): add config 2024-07-30 13:01:15 +03:00
Ilya Averyanov b74189570d feat(queue): do not use ee app from emqx app 2024-07-30 13:01:15 +03:00
Ilya Averyanov 649cf88042 feat(queue): kick agents that do not return to the replaying state for long 2024-07-30 13:01:15 +03:00
Ilya Averyanov 1496f7f778 feat(queue): add leader_rank_progress test 2024-07-30 13:01:15 +03:00
Ilya Averyanov 91dd1183ad feat(queue): fix dialyzer issues 2024-07-30 13:01:14 +03:00
Ilya Averyanov 65ab81ff74 feat(queue): fix quick resubscription 2024-07-30 13:01:14 +03:00
Ilya Averyanov 53d4cd3174 feat(queue): rename leader' stream_progresses to stream_states 2024-07-30 13:01:14 +03:00
Ilya Averyanov 7d004b37da feat(queue): implement stream finalization 2024-07-30 13:01:13 +03:00
Ilya Averyanov e5547005eb feat(queue): implement resubscribe test 2024-07-30 13:01:13 +03:00
Ilya Averyanov fada2a3fea feat(queue): reorganize and document shared subs module 2024-07-30 13:01:13 +03:00
Ilya Averyanov b4a010d63b feat(queue): implement unsubscribe 2024-07-30 13:01:13 +03:00
Ilya Averyanov 9bde981c44 feat(queue): fix static check issues 2024-07-30 13:01:12 +03:00
Ilya Averyanov 7658e081c5 feat(queue): move design docs to the EIP 2024-07-30 13:01:12 +03:00
Ilya Averyanov 8dce530d15 feat(queue): fix progress reporting and more tests
We test reassignment during the intensive replay
2024-07-30 13:01:12 +03:00
Ilya Averyanov a20d262327 feat(queue): send progress before fetching new messages 2024-07-30 13:01:11 +03:00
Ilya Averyanov d32f282feb feat(queue): add graceful disconnect 2024-07-30 13:01:11 +03:00
Ilya Averyanov 1d728a05b2 feat(queue): send metadata with agent when connecting to leader
It will be used to attach agent taints to improve stream assignment.
2024-07-30 13:01:11 +03:00
Ilya Averyanov 49bff5c08a feat(queue): wrap remote calls in a proto 2024-07-30 13:01:10 +03:00
Ilya Averyanov 61eda0ff31 feat(queue): identify agents by SessionId in tests 2024-07-30 13:01:10 +03:00
Ilya Averyanov 8f0d807c00 feat(queue): add new test scenarios 2024-07-30 13:01:10 +03:00
Ilya Averyanov bceb5d43ed feat(queue): fix stream rebalancing issues, update tests 2024-07-30 13:01:10 +03:00
Ilya Averyanov 03fea34962 feat(queue): document protocol between agent and leader
Document leader's states
2024-07-30 13:01:09 +03:00
Ilya Averyanov 082514f557 feat(queue): implement full protocol between agent and leader 2024-07-30 13:01:09 +03:00
Ilya Averyanov c831f0772f feat(queue): handle renew_lease_timeout 2024-07-30 13:01:09 +03:00
Ivan Dyachkov ca455ad992
Merge pull request #13532 from emqx/sync-release-57-20240729-021938
Sync release-57
2024-07-30 10:49:09 +02:00
JianBo He c347c2c285
Merge pull request #13540 from zmstone/0729-rule-funciton-getenv-should-be-limited-to-vars-with-prefix-EMQXVAR_
refactor: force getenv to access only OS env with prefix EMQXVAR_
2024-07-30 08:23:24 +08:00
zmstone a49cd78aae refactor: force getenv to access only OS env with prefix EMQXVAR_ 2024-07-29 23:54:00 +02:00
zmstone 4065158be7
Merge pull request #13534 from JimMoen/feat-add-superuser-skip-authz
feat: add authz skipped trace for superuser
2024-07-29 22:30:13 +02:00
ieQu1 18721d05bc
Merge pull request #13526 from ieQu1/replicant-ee
fix(mria): Reserve replicant role for EE only
2024-07-29 22:10:06 +02:00
zmstone 7f7d0741d2
Merge pull request #13528 from zmstone/0726-unrecoverable-error-limit
0726 unrecoverable error limit
2024-07-29 21:32:14 +02:00
zmstone 2e39c4ad5e
Merge pull request #13495 from thalesmg/20240719-m-hide-enable-fields-mkII
chore: hide enable flags from schema and config examples
2024-07-29 21:31:20 +02:00
zmstone 5b50d5433a
Merge pull request #13537 from thalesmg/20240729-r57-auto-decode-payload-kprodu
feat: attempt to automatically decode `payload` similar to key and message templates
2024-07-29 21:03:41 +02:00
zmstone eab440e0c1 docs: add changelog for PR 13528 2024-07-29 20:41:57 +02:00
zmstone e08425e67d refactor(log-throttler): remove unnecessary code
there is no need to reset counters before erasing
2024-07-29 20:41:27 +02:00
zhongwencool f6f1d32da0 feat: throttle with resource_id 2024-07-29 20:41:27 +02:00
zhongwencool 2924ec582a feat: add unrecoverable_resource_error throttle 2024-07-29 20:41:27 +02:00
zhongwencool 8dc1d1424a chore: add resource tag for log 2024-07-29 20:41:27 +02:00
Thales Macedo Garitezi 693d5dd394 feat: attempt to automatically decode `payload` similar to key and message templates 2024-07-29 13:01:06 -03:00
ieQu1 f85db0a0e9
fix: Apply suggestions from code review
Co-authored-by: Thales Macedo Garitezi <thalesmg@gmail.com>
2024-07-29 17:22:41 +02:00
lafirest 60aefd1065
Merge pull request #13520 from lafirest/feat/scram-rest-acl
feat(scram): supports ACL rules in `scram_restapi` backend
2024-07-29 22:46:50 +08:00
JianBo He c637422302
Merge pull request #13518 from thalesmg/20240724-r57-dynamic-kprodu-action-mkIII
feat(kafka producer): allow dynamic topics (mkIII)
2024-07-29 22:43:20 +08:00
Thales Macedo Garitezi e80d43d14d test(fix): use ebin path without plugins
Without the filtering that already exists in cth:ebin_path, the rebar3 plugins path may
take priority over normal dependencies.  Since we just updated hocon, and there seems to
be an older hocon among the rebar3 plugins, it started to break the test because older
hocon was getting loaded in the peer.
2024-07-29 09:50:25 -03:00
Thales Macedo Garitezi b3074144cc chore: temporarily revert `NO_DOC` changes to fields with default value = false
These will be dealt with in follow up PRs, by allowing the parent struct to be set to a
special `disabled` value in such cases.
2024-07-29 09:50:25 -03:00
Thales Macedo Garitezi 6786c9b517
refactor: improve descriptions and identifiers
Co-authored-by: zmstone <zmstone@gmail.com>
2024-07-29 09:45:52 -03:00
Thales Macedo Garitezi 8913de10c0
Merge pull request #13527 from thalesmg/20240726-r57-multiple-froms-sql-test
fix(rule engine tester): fix message publish with bridge source in from clause
2024-07-29 09:37:17 -03:00
JimMoen 5ddd7d7a6a
test: superuser skipped all authz check 2024-07-29 17:27:36 +08:00
JimMoen d7cac74bed
feat: add authz skipped trace 2024-07-29 17:27:36 +08:00
JianBo He 0b0a28ae44
chore: update changes/ee/feat-13504.en.md
Co-authored-by: zmstone <zmstone@gmail.com>
2024-07-29 10:45:24 +08:00
id c1e2801f41 Merge remote-tracking branch 'origin/release-57' into sync-release-57-20240729-021938 2024-07-29 02:19:38 +00:00
ieQu1 8036baf22c
test(paho): Run RLOG paho test with replicants only on EE 2024-07-26 21:53:32 +02:00
ieQu1 268f887700
test(mgmt): Disable certain tests on OSS 2024-07-26 20:24:53 +02:00
Thales Macedo Garitezi 1d56ac6e5e refactor: change topic schema type 2024-07-26 14:26:21 -03:00
Thales Macedo Garitezi 4e0742c66f feat: make kafka producer freely dynamic 2024-07-26 14:25:20 -03:00
ieQu1 8c1302f455
test(conf_app): Remove redundand config 2024-07-26 17:25:09 +02:00
ieQu1 b8a2a8ea18
test(router): Skip certain tests on OSS 2024-07-26 17:25:09 +02:00
ieQu1 b7c424a13d
test(persmsg): Remove redundant config 2024-07-26 17:17:06 +02:00
ieQu1 1b6494ab9a
test(mgmt): Remove redundant config 2024-07-26 17:17:06 +02:00
ieQu1 41bf5cd6ca
test(otel): Remove redundant config 2024-07-26 17:17:06 +02:00
ieQu1 548bcceab7
test(auth): Remove redundant config 2024-07-26 17:17:06 +02:00
ieQu1 1beda1cd11
test(mria): Remove role from the example config 2024-07-26 17:17:06 +02:00
ieQu1 9da744c423
fix(mria): Reserve replicant role for EE only 2024-07-26 17:17:06 +02:00
lafirest b2f2af6871
Merge pull request #13523 from lafirest/fix/oidc
fix(oidc): fixed update and callback errors for OIDC
2024-07-26 21:09:11 +08:00
Thales Macedo Garitezi 3fae704903 fix(rule engine tester): fix message publish with bridge source in from clause
Fixes https://emqx.atlassian.net/browse/EMQX-12762
2024-07-26 09:27:16 -03:00
lafirest 2d6b2bff8e
Merge pull request #13524 from lafirest/feat/exclusive-cli
feat(exclusive): added CLI interface for exclusive topics
2024-07-26 19:54:03 +08:00
firest dc342a35ac chore: update changes 2024-07-26 17:18:52 +08:00
firest 397c104a85 feat(exclusive): added CLI interface for exclusive topics 2024-07-26 16:56:47 +08:00
firest 49b24a3049 fix(oidc): fixed update and callback errors for OIDC 2024-07-26 15:41:22 +08:00
firest 7bf70aaab6 feat(scram): supports ACL rules in `scram_restapi` backend 2024-07-26 14:30:28 +08:00
zmstone 9a5d50f26a
Merge pull request #13521 from zmstone/0725-add-ldap-reconnect-on-timeout
0725 add ldap reconnect on timeout
2024-07-26 08:29:02 +02:00
Thales Macedo Garitezi df1f4fad70 feat(kafka producer): allow dynamic topics from pre-configured topics
Fixes https://emqx.atlassian.net/browse/EMQX-12656
2024-07-25 15:33:12 -03:00
Thales Macedo Garitezi 33eccb35da chore: update wolff -> 3.0.2 2024-07-25 14:30:50 -03:00
zmstone f6a0f56771 docs: add changelog for PR 13521 2024-07-25 19:24:52 +02:00
zmstone 7631420eef test: add test case to cover ldap search timeout 2024-07-25 18:43:37 +02:00
zmstone 8f94e9684c fix: handle ldap seqrch error 2024-07-25 18:42:09 +02:00
zmstone 43f799508a chore: add ldap test doc 2024-07-25 18:42:08 +02:00
lafirest 1925ed2f55
Merge pull request #13507 from lafirest/feat/env_func
feat(variform): add a builtin function to get env vars
2024-07-25 22:46:50 +08:00
lafirest a45f817f0e
Merge pull request #13515 from lafirest/fix/exclusive
fix(exclusive): allow the same client to resubscribe to an existing exclusive topic
2024-07-25 21:26:08 +08:00
firest 57959ac7d4 chore: update changes 2024-07-25 18:59:40 +08:00
firest 79020b2436 feat(variform): add a builtin function to get env vars 2024-07-25 18:50:53 +08:00
firest 141d8144e4 fix(scram): change the name from `scram_http` to `scram_restapi` 2024-07-25 17:01:49 +08:00
firest 4f21594707 chore: update changes 2024-07-25 09:40:20 +08:00
firest 117c8197d7 fix(exclusive): allow the same client to resubscribe to an existing exclusive topic 2024-07-25 09:40:15 +08:00
Thales Macedo Garitezi c728b98e79
Merge pull request #13510 from thalesmg/20240723-r57-fix-jwt-about-to-expire-check
fix(jwt): fix grace period for renewal check
2024-07-24 16:52:35 -03:00
Thales Macedo Garitezi 9e65e0d048
Merge pull request #13503 from thalesmg/20240722-r57-resource-manager-hc-interval-startup
fix(connector resource): use configuration `resource_opts` for health check interval when starting up
2024-07-24 09:15:47 -03:00
Thales Macedo Garitezi 7374123c5c fix(jwt): fix grace period for renewal check 2024-07-23 17:25:29 -03:00
Thales Macedo Garitezi d7112921a6 docs: remove `enable` from config examples
Fixes https://emqx.atlassian.net/browse/EMQX-12730
2024-07-22 13:26:53 -03:00
Thales Macedo Garitezi 69f5b6fa6c chore: hide `enable` fields from docgen
Fixes https://emqx.atlassian.net/browse/EMQX-12730
2024-07-22 13:26:53 -03:00
Thales Macedo Garitezi 8ae54ac325 fix(connector resource): use configuration `resource_opts` for health check interval when starting up
Fixes https://emqx.atlassian.net/browse/EMQX-12738
2024-07-22 11:34:10 -03:00
Thales Macedo Garitezi 65544f34ec chore: bump hocon -> 0.43.2 2024-07-19 17:25:18 -03:00
308 changed files with 7902 additions and 2053 deletions

View File

@ -10,7 +10,7 @@ services:
nofile: 1024
image: openldap
#ports:
# - 389:389
# - "389:389"
volumes:
- ./certs/ca.crt:/etc/certs/ca.crt
restart: always

View File

@ -0,0 +1,61 @@
# LDAP authentication
To run manual tests with the default docker-compose files.
Expose openldap container port by uncommenting the `ports` config in `docker-compose-ldap.yaml `
To start openldap:
```
docker-compose -f ./.ci/docker-compose-file/docker-compose.yaml -f ./.ci/docker-compose-file/docker-compose-ldap.yaml up -docker
```
## LDAP database
LDAP database is populated from below files:
```
apps/emqx_ldap/test/data/emqx.io.ldif /usr/local/etc/openldap/schema/emqx.io.ldif
apps/emqx_ldap/test/data/emqx.schema /usr/local/etc/openldap/schema/emqx.schema
```
## Minimal EMQX config
```
authentication = [
{
backend = ldap
base_dn = "uid=${username},ou=testdevice,dc=emqx,dc=io"
filter = "(& (objectClass=mqttUser) (uid=${username}))"
mechanism = password_based
method {
is_superuser_attribute = isSuperuser
password_attribute = userPassword
type = hash
}
password = public
pool_size = 8
query_timeout = "5s"
request_timeout = "10s"
server = "localhost:1389"
username = "cn=root,dc=emqx,dc=io"
}
]
```
## Example ldapsearch command
```
ldapsearch -x -H ldap://localhost:389 -D "cn=root,dc=emqx,dc=io" -W -b "uid=mqttuser0007,ou=testdevice,dc=emqx,dc=io" "(&(objectClass=mqttUser)(uid=mqttuser0007))"
```
## Example mqttx command
The client password hashes are generated from their username.
```
# disabled user
mqttx pub -t 't/1' -h localhost -p 1883 -m x -u mqttuser0006 -P mqttuser0006
# enabled super-user
mqttx pub -t 't/1' -h localhost -p 1883 -m x -u mqttuser0007 -P mqttuser0007
```

View File

@ -51,7 +51,7 @@ runs:
echo "SELF_HOSTED=false" >> $GITHUB_OUTPUT
;;
esac
- uses: actions/cache@ab5e6d0c87105b4c9c2047343972218f562e4319 # v4.0.1
- uses: actions/cache@0c45773b623bea8c8e75f6c82b208c3cf94ea4f9 # v4.0.2
id: cache
if: steps.prepare.outputs.SELF_HOSTED != 'true'
with:

View File

@ -152,7 +152,7 @@ jobs:
echo "PROFILE=${PROFILE}" | tee -a .env
echo "PKG_VSN=$(./pkg-vsn.sh ${PROFILE})" | tee -a .env
zip -ryq -x@.github/workflows/.zipignore $PROFILE.zip .
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
with:
name: ${{ matrix.profile }}
path: ${{ matrix.profile }}.zip

View File

@ -163,7 +163,7 @@ jobs:
echo "PROFILE=${PROFILE}" | tee -a .env
echo "PKG_VSN=$(./pkg-vsn.sh ${PROFILE})" | tee -a .env
zip -ryq -x@.github/workflows/.zipignore $PROFILE.zip .
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
with:
name: ${{ matrix.profile }}
path: ${{ matrix.profile }}.zip

View File

@ -83,7 +83,7 @@ jobs:
id: build
run: |
make ${{ matrix.profile }}-tgz
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
with:
name: "${{ matrix.profile }}-${{ matrix.arch }}.tar.gz"
path: "_packages/emqx*/emqx-*.tar.gz"
@ -110,7 +110,7 @@ jobs:
- uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
with:
ref: ${{ github.event.inputs.ref }}
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
pattern: "${{ matrix.profile[0] }}-*.tar.gz"
path: _packages
@ -122,24 +122,25 @@ jobs:
run: |
ls -lR _packages/$PROFILE
mv _packages/$PROFILE/*.tar.gz ./
- name: Enable containerd image store on Docker Engine
run: |
echo "$(jq '. += {"features": {"containerd-snapshotter": true}}' /etc/docker/daemon.json)" > daemon.json
echo "$(sudo cat /etc/docker/daemon.json | jq '. += {"features": {"containerd-snapshotter": true}}')" > daemon.json
sudo mv daemon.json /etc/docker/daemon.json
sudo systemctl restart docker
- uses: docker/setup-qemu-action@68827325e0b33c7199eb31dd4e31fbe9023e06e3 # v3.0.0
- uses: docker/setup-buildx-action@d70bba72b1f3fd22344832f00baa16ece964efeb # v3.3.0
- uses: docker/setup-qemu-action@49b3bc8e6bdd4a60e6116a5414239cba5943d3cf # v3.2.0
- uses: docker/setup-buildx-action@988b5a0280414f521da01fcc63a27aeeb4b104db # v3.6.1
- name: Login to hub.docker.com
uses: docker/login-action@0d4c9c5ea7693da7b068278f7b52bda2a190a446 # v3.2.0
uses: docker/login-action@9780b0c442fbb1117ed29e0efdff1e18412f7567 # v3.3.0
if: inputs.publish && contains(matrix.profile[1], 'docker.io')
with:
username: ${{ secrets.DOCKER_HUB_USER }}
password: ${{ secrets.DOCKER_HUB_TOKEN }}
- name: Login to AWS ECR
uses: docker/login-action@0d4c9c5ea7693da7b068278f7b52bda2a190a446 # v3.2.0
uses: docker/login-action@9780b0c442fbb1117ed29e0efdff1e18412f7567 # v3.3.0
if: inputs.publish && contains(matrix.profile[1], 'public.ecr.aws')
with:
registry: public.ecr.aws

View File

@ -51,7 +51,7 @@ jobs:
if: always()
run: |
docker save $_EMQX_DOCKER_IMAGE_TAG | gzip > $EMQX_NAME-docker-$PKG_VSN.tar.gz
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
with:
name: "${{ env.EMQX_NAME }}-docker"
path: "${{ env.EMQX_NAME }}-docker-${{ env.PKG_VSN }}.tar.gz"

View File

@ -95,7 +95,7 @@ jobs:
apple_developer_identity: ${{ secrets.APPLE_DEVELOPER_IDENTITY }}
apple_developer_id_bundle: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE }}
apple_developer_id_bundle_password: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE_PASSWORD }}
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: success()
with:
name: ${{ matrix.profile }}-${{ matrix.os }}-${{ matrix.otp }}
@ -180,7 +180,7 @@ jobs:
--builder $BUILDER \
--elixir $IS_ELIXIR \
--pkgtype pkg
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
with:
name: ${{ matrix.profile }}-${{ matrix.os }}-${{ matrix.arch }}${{ matrix.with_elixir == 'yes' && '-elixir' || '' }}-${{ matrix.builder }}-${{ matrix.otp }}-${{ matrix.elixir }}
path: _packages/${{ matrix.profile }}/
@ -198,7 +198,7 @@ jobs:
profile:
- ${{ inputs.profile }}
steps:
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
pattern: "${{ matrix.profile }}-*"
path: packages/${{ matrix.profile }}

View File

@ -23,6 +23,7 @@ jobs:
profile:
- ['emqx', 'master']
- ['emqx', 'release-57']
- ['emqx', 'release-58']
os:
- ubuntu22.04
- amzn2023
@ -53,7 +54,7 @@ jobs:
- name: build pkg
run: |
./scripts/buildx.sh --profile "$PROFILE" --pkgtype pkg --builder "$BUILDER"
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: success()
with:
name: ${{ matrix.profile[0] }}-${{ matrix.profile[1] }}-${{ matrix.os }}
@ -101,7 +102,7 @@ jobs:
apple_developer_identity: ${{ secrets.APPLE_DEVELOPER_IDENTITY }}
apple_developer_id_bundle: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE }}
apple_developer_id_bundle_password: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE_PASSWORD }}
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: success()
with:
name: ${{ matrix.profile }}-${{ matrix.os }}

View File

@ -41,13 +41,13 @@ jobs:
- name: build pkg
run: |
./scripts/buildx.sh --profile $PROFILE --pkgtype pkg --elixir $ELIXIR --arch $ARCH
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
with:
name: "${{ matrix.profile[0] }}-${{ matrix.profile[1] }}-${{ matrix.profile[2] }}"
path: _packages/${{ matrix.profile[0] }}/*
retention-days: 7
compression-level: 0
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
with:
name: "${{ matrix.profile[0] }}-schema-dump-${{ matrix.profile[1] }}-${{ matrix.profile[2] }}"
path: |
@ -84,7 +84,7 @@ jobs:
apple_developer_identity: ${{ secrets.APPLE_DEVELOPER_IDENTITY }}
apple_developer_id_bundle: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE }}
apple_developer_id_bundle_password: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE_PASSWORD }}
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
with:
name: ${{ matrix.os }}
path: _packages/**/*

View File

@ -37,7 +37,7 @@ jobs:
- run: ./scripts/check-elixir-deps-discrepancies.exs
- run: ./scripts/check-elixir-applications.exs
- name: Upload produced lock files
uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: failure()
with:
name: ${{ matrix.profile }}_produced_lock_files

View File

@ -24,6 +24,7 @@ jobs:
branch:
- master
- release-57
- release-58
language:
- cpp
- python

View File

@ -24,6 +24,7 @@ jobs:
ref:
- master
- release-57
- release-58
steps:
- uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
with:

View File

@ -52,7 +52,7 @@ jobs:
id: package_file
run: |
echo "PACKAGE_FILE=$(find _packages/emqx -name 'emqx-*.deb' | head -n 1 | xargs basename)" >> $GITHUB_OUTPUT
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
with:
name: emqx-ubuntu20.04
path: _packages/emqx/${{ steps.package_file.outputs.PACKAGE_FILE }}
@ -77,7 +77,7 @@ jobs:
repository: emqx/tf-emqx-performance-test
path: tf-emqx-performance-test
ref: v0.2.3
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
name: emqx-ubuntu20.04
path: tf-emqx-performance-test/
@ -113,13 +113,13 @@ jobs:
working-directory: ./tf-emqx-performance-test
run: |
terraform destroy -auto-approve
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: success()
with:
name: metrics
path: |
"./tf-emqx-performance-test/*.tar.gz"
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: failure()
with:
name: terraform
@ -148,7 +148,7 @@ jobs:
repository: emqx/tf-emqx-performance-test
path: tf-emqx-performance-test
ref: v0.2.3
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
name: emqx-ubuntu20.04
path: tf-emqx-performance-test/
@ -184,13 +184,13 @@ jobs:
working-directory: ./tf-emqx-performance-test
run: |
terraform destroy -auto-approve
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: success()
with:
name: metrics
path: |
"./tf-emqx-performance-test/*.tar.gz"
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: failure()
with:
name: terraform
@ -220,7 +220,7 @@ jobs:
repository: emqx/tf-emqx-performance-test
path: tf-emqx-performance-test
ref: v0.2.3
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
name: emqx-ubuntu20.04
path: tf-emqx-performance-test/
@ -257,13 +257,13 @@ jobs:
working-directory: ./tf-emqx-performance-test
run: |
terraform destroy -auto-approve
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: success()
with:
name: metrics
path: |
"./tf-emqx-performance-test/*.tar.gz"
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: failure()
with:
name: terraform
@ -294,7 +294,7 @@ jobs:
repository: emqx/tf-emqx-performance-test
path: tf-emqx-performance-test
ref: v0.2.3
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
name: emqx-ubuntu20.04
path: tf-emqx-performance-test/
@ -330,13 +330,13 @@ jobs:
working-directory: ./tf-emqx-performance-test
run: |
terraform destroy -auto-approve
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: success()
with:
name: metrics
path: |
"./tf-emqx-performance-test/*.tar.gz"
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: failure()
with:
name: terraform

View File

@ -25,7 +25,7 @@ jobs:
- emqx
- emqx-enterprise
steps:
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
name: ${{ matrix.profile }}
- name: extract artifact
@ -40,7 +40,7 @@ jobs:
if: failure()
run: |
cat _build/${{ matrix.profile }}/rel/emqx/log/erlang.log.*
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: failure()
with:
name: conftest-logs-${{ matrix.profile }}

View File

@ -35,7 +35,7 @@ jobs:
source env.sh
PKG_VSN=$(docker run --rm -v $(pwd):$(pwd) -w $(pwd) -u $(id -u) "$EMQX_BUILDER" ./pkg-vsn.sh "$EMQX_NAME")
echo "PKG_VSN=$PKG_VSN" >> "$GITHUB_ENV"
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
name: ${{ env.EMQX_NAME }}-docker
path: /tmp
@ -69,7 +69,6 @@ jobs:
shell: bash
env:
EMQX_NAME: ${{ matrix.profile }}
_EMQX_TEST_DB_BACKEND: ${{ matrix.cluster_db_backend }}
strategy:
fail-fast: false
@ -78,18 +77,20 @@ jobs:
- emqx
- emqx-enterprise
- emqx-elixir
cluster_db_backend:
- mnesia
- rlog
steps:
- uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
- name: Set up environment
id: env
run: |
source env.sh
if [ "$EMQX_NAME" = "emqx-enterprise" ]; then
_EMQX_TEST_DB_BACKEND='rlog'
else
_EMQX_TEST_DB_BACKEND='mnesia'
fi
PKG_VSN=$(docker run --rm -v $(pwd):$(pwd) -w $(pwd) -u $(id -u) "$EMQX_BUILDER" ./pkg-vsn.sh "$EMQX_NAME")
echo "PKG_VSN=$PKG_VSN" >> "$GITHUB_ENV"
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
name: ${{ env.EMQX_NAME }}-docker
path: /tmp

View File

@ -95,7 +95,7 @@ jobs:
echo "Suites: $SUITES"
./rebar3 as standalone_test ct --name 'test@127.0.0.1' -v --readable=true --suite="$SUITES"
fi
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: failure()
with:
name: logs-emqx-app-tests-${{ matrix.type }}

View File

@ -44,7 +44,7 @@ jobs:
source env.sh
PKG_VSN=$(docker run --rm -v $(pwd):$(pwd) -w $(pwd) -u $(id -u) "$EMQX_BUILDER" ./pkg-vsn.sh "$EMQX_NAME")
echo "EMQX_TAG=$PKG_VSN" >> "$GITHUB_ENV"
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
name: "${{ env.EMQX_NAME }}-docker"
path: /tmp

View File

@ -31,7 +31,7 @@ jobs:
else
wget --no-verbose --no-check-certificate -O /tmp/apache-jmeter.tgz $ARCHIVE_URL
fi
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
with:
name: apache-jmeter.tgz
path: /tmp/apache-jmeter.tgz
@ -58,7 +58,7 @@ jobs:
source env.sh
PKG_VSN=$(docker run --rm -v $(pwd):$(pwd) -w $(pwd) -u $(id -u) "$EMQX_BUILDER" ./pkg-vsn.sh emqx)
echo "PKG_VSN=$PKG_VSN" >> "$GITHUB_ENV"
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
name: emqx-docker
path: /tmp
@ -95,7 +95,7 @@ jobs:
echo "check logs failed"
exit 1
fi
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: always()
with:
name: jmeter_logs-advanced_feat-${{ matrix.scripts_type }}
@ -127,7 +127,7 @@ jobs:
source env.sh
PKG_VSN=$(docker run --rm -v $(pwd):$(pwd) -w $(pwd) -u $(id -u) "$EMQX_BUILDER" ./pkg-vsn.sh emqx)
echo "PKG_VSN=$PKG_VSN" >> "$GITHUB_ENV"
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
name: emqx-docker
path: /tmp
@ -175,7 +175,7 @@ jobs:
if: failure()
run: |
docker compose -f .ci/docker-compose-file/docker-compose-emqx-cluster.yaml logs --no-color > ./jmeter_logs/emqx.log
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: always()
with:
name: jmeter_logs-pgsql_authn_authz-${{ matrix.scripts_type }}_${{ matrix.pgsql_tag }}
@ -204,7 +204,7 @@ jobs:
source env.sh
PKG_VSN=$(docker run --rm -v $(pwd):$(pwd) -w $(pwd) -u $(id -u) "$EMQX_BUILDER" ./pkg-vsn.sh emqx)
echo "PKG_VSN=$PKG_VSN" >> "$GITHUB_ENV"
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
name: emqx-docker
path: /tmp
@ -248,7 +248,7 @@ jobs:
echo "check logs failed"
exit 1
fi
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: always()
with:
name: jmeter_logs-mysql_authn_authz-${{ matrix.scripts_type }}_${{ matrix.mysql_tag }}
@ -273,7 +273,7 @@ jobs:
source env.sh
PKG_VSN=$(docker run --rm -v $(pwd):$(pwd) -w $(pwd) -u $(id -u) "$EMQX_BUILDER" ./pkg-vsn.sh emqx)
echo "PKG_VSN=$PKG_VSN" >> "$GITHUB_ENV"
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
name: emqx-docker
path: /tmp
@ -313,7 +313,7 @@ jobs:
echo "check logs failed"
exit 1
fi
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: always()
with:
name: jmeter_logs-JWT_authn-${{ matrix.scripts_type }}
@ -339,7 +339,7 @@ jobs:
source env.sh
PKG_VSN=$(docker run --rm -v $(pwd):$(pwd) -w $(pwd) -u $(id -u) "$EMQX_BUILDER" ./pkg-vsn.sh emqx)
echo "PKG_VSN=$PKG_VSN" >> "$GITHUB_ENV"
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
name: emqx-docker
path: /tmp
@ -370,7 +370,7 @@ jobs:
echo "check logs failed"
exit 1
fi
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: always()
with:
name: jmeter_logs-built_in_database_authn_authz-${{ matrix.scripts_type }}

View File

@ -25,7 +25,7 @@ jobs:
run:
shell: bash
steps:
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
name: emqx-enterprise
- name: extract artifact
@ -45,7 +45,7 @@ jobs:
run: |
export PROFILE='emqx-enterprise'
make emqx-enterprise-tgz
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
name: Upload built emqx and test scenario
with:
name: relup_tests_emqx_built
@ -72,7 +72,7 @@ jobs:
run:
shell: bash
steps:
- uses: erlef/setup-beam@a6e26b22319003294c58386b6f25edbc7336819a # v1.18.0
- uses: erlef/setup-beam@b9c58b0450cd832ccdb3c17cc156a47065d2114f # v1.18.1
with:
otp-version: 26.2.5
- uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
@ -88,7 +88,7 @@ jobs:
./configure
make
echo "$(pwd)/bin" >> $GITHUB_PATH
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
name: Download built emqx and test scenario
with:
name: relup_tests_emqx_built
@ -111,7 +111,7 @@ jobs:
docker logs node2.emqx.io | tee lux_logs/emqx2.log
exit 1
fi
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
name: Save debug data
if: failure()
with:

View File

@ -46,7 +46,7 @@ jobs:
contents: read
steps:
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
name: ${{ matrix.profile }}
@ -90,7 +90,7 @@ jobs:
contents: read
steps:
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
name: ${{ matrix.profile }}
- name: extract artifact
@ -133,7 +133,7 @@ jobs:
if: failure()
run: tar -czf logs.tar.gz _build/test/logs
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: failure()
with:
name: logs-${{ matrix.profile }}-${{ matrix.prefix }}-sg${{ matrix.suitegroup }}
@ -164,7 +164,7 @@ jobs:
CT_COVER_EXPORT_PREFIX: ${{ matrix.profile }}-sg${{ matrix.suitegroup }}
steps:
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
name: ${{ matrix.profile }}
- name: extract artifact
@ -193,7 +193,7 @@ jobs:
if: failure()
run: tar -czf logs.tar.gz _build/test/logs
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: failure()
with:
name: logs-${{ matrix.profile }}-${{ matrix.prefix }}-sg${{ matrix.suitegroup }}

View File

@ -30,7 +30,7 @@ jobs:
persist-credentials: false
- name: "Run analysis"
uses: ossf/scorecard-action@dc50aa9510b46c811795eb24b2f1ba02a914e534 # v2.3.3
uses: ossf/scorecard-action@62b2cac7ed8198b15735ed49ab1e5cf35480ba46 # v2.4.0
with:
results_file: results.sarif
results_format: sarif
@ -40,7 +40,7 @@ jobs:
publish_results: true
- name: "Upload artifact"
uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
with:
name: SARIF file
path: results.sarif

View File

@ -19,7 +19,7 @@ jobs:
- emqx-enterprise
runs-on: ${{ endsWith(github.repository, '/emqx') && 'ubuntu-22.04' || fromJSON('["self-hosted","ephemeral","linux","x64"]') }}
steps:
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
pattern: "${{ matrix.profile }}-schema-dump-*-x64"
merge-multiple: true

View File

@ -30,7 +30,7 @@ jobs:
include: ${{ fromJson(inputs.ct-matrix) }}
container: "${{ inputs.builder }}"
steps:
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
name: ${{ matrix.profile }}
- name: extract artifact

View File

@ -34,7 +34,7 @@ jobs:
pull-requests: write
steps:
- uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633 # v4.1.2
- uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
with:
fetch-depth: 0

View File

@ -10,8 +10,8 @@ include env.sh
# Dashboard version
# from https://github.com/emqx/emqx-dashboard5
export EMQX_DASHBOARD_VERSION ?= v1.9.1
export EMQX_EE_DASHBOARD_VERSION ?= e1.7.1
export EMQX_DASHBOARD_VERSION ?= v1.10.0-beta.1
export EMQX_EE_DASHBOARD_VERSION ?= e1.8.0-beta.1
export EMQX_RELUP ?= true
export EMQX_REL_FORM ?= tgz

View File

@ -65,9 +65,20 @@
%% Route
%%--------------------------------------------------------------------
-record(share_dest, {
session_id :: emqx_session:session_id(),
group :: emqx_types:group()
}).
-record(route, {
topic :: binary(),
dest :: node() | {binary(), node()} | emqx_session:session_id() | emqx_external_broker:dest()
dest ::
node()
| {binary(), node()}
| emqx_session:session_id()
%% One session can also have multiple subscriptions to the same topic through different groups
| #share_dest{}
| emqx_external_broker:dest()
}).
%%--------------------------------------------------------------------

View File

@ -683,6 +683,7 @@ end).
-define(FRAME_PARSE_ERROR, frame_parse_error).
-define(FRAME_SERIALIZE_ERROR, frame_serialize_error).
-define(THROW_FRAME_ERROR(Reason), erlang:throw({?FRAME_PARSE_ERROR, Reason})).
-define(THROW_SERIALIZE_ERROR(Reason), erlang:throw({?FRAME_SERIALIZE_ERROR, Reason})).

View File

@ -32,7 +32,7 @@
%% `apps/emqx/src/bpapi/README.md'
%% Opensource edition
-define(EMQX_RELEASE_CE, "5.7.1").
-define(EMQX_RELEASE_CE, "5.8.0-alpha.1").
%% Enterprise edition
-define(EMQX_RELEASE_EE, "5.7.1").
-define(EMQX_RELEASE_EE, "5.8.0-alpha.1").

View File

@ -41,16 +41,20 @@
).
%% NOTE: do not forget to use atom for msg and add every used msg to
%% the default value of `log.thorttling.msgs` list.
%% the default value of `log.throttling.msgs` list.
-define(SLOG_THROTTLE(Level, Data),
?SLOG_THROTTLE(Level, Data, #{})
).
-define(SLOG_THROTTLE(Level, Data, Meta),
?SLOG_THROTTLE(Level, undefined, Data, Meta)
).
-define(SLOG_THROTTLE(Level, UniqueKey, Data, Meta),
case logger:allow(Level, ?MODULE) of
true ->
(fun(#{msg := __Msg} = __Data) ->
case emqx_log_throttler:allow(__Msg) of
case emqx_log_throttler:allow(__Msg, UniqueKey) of
true ->
logger:log(Level, __Data, Meta);
false ->
@ -87,7 +91,7 @@
?_DO_TRACE(Tag, Msg, Meta),
?SLOG(
Level,
(emqx_trace_formatter:format_meta_map(Meta))#{msg => Msg, tag => Tag},
(Meta)#{msg => Msg, tag => Tag},
#{is_trace => false}
)
end).

View File

@ -27,6 +27,7 @@
{emqx_ds,2}.
{emqx_ds,3}.
{emqx_ds,4}.
{emqx_ds_shared_sub,1}.
{emqx_eviction_agent,1}.
{emqx_eviction_agent,2}.
{emqx_eviction_agent,3}.

View File

@ -28,15 +28,14 @@
{lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}},
{gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.3"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.12.0"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.5"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.43.1"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.43.2"}}},
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}},
{pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
{recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}},
{snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.10"}}},
{ra, "2.7.3"}
{snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.10"}}}
]}.
{plugins, [{rebar3_proper, "0.12.1"}, rebar3_path_deps]}.

View File

@ -2,7 +2,7 @@
{application, emqx, [
{id, "emqx"},
{description, "EMQX Core"},
{vsn, "5.3.3"},
{vsn, "5.3.4"},
{modules, []},
{registered, []},
{applications, [

View File

@ -146,7 +146,9 @@
-type replies() :: emqx_types:packet() | reply() | [reply()].
-define(IS_MQTT_V5, #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}).
-define(IS_CONNECTED_OR_REAUTHENTICATING(ConnState),
((ConnState == connected) orelse (ConnState == reauthenticating))
).
-define(IS_COMMON_SESSION_TIMER(N),
((N == retry_delivery) orelse (N == expire_awaiting_rel))
).
@ -337,7 +339,7 @@ take_conn_info_fields(Fields, ClientInfo, ConnInfo) ->
| {shutdown, Reason :: term(), channel()}
| {shutdown, Reason :: term(), replies(), channel()}.
handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = ConnState}) when
ConnState =:= connected orelse ConnState =:= reauthenticating
?IS_CONNECTED_OR_REAUTHENTICATING(ConnState)
->
handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel);
handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = connecting}) ->
@ -567,29 +569,8 @@ handle_in(
process_disconnect(ReasonCode, Properties, NChannel);
handle_in(?AUTH_PACKET(), Channel) ->
handle_out(disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, Channel);
handle_in({frame_error, Reason}, Channel = #channel{conn_state = idle}) ->
shutdown(shutdown_count(frame_error, Reason), Channel);
handle_in(
{frame_error, #{cause := frame_too_large} = R}, Channel = #channel{conn_state = connecting}
) ->
shutdown(
shutdown_count(frame_error, R), ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE), Channel
);
handle_in({frame_error, Reason}, Channel = #channel{conn_state = connecting}) ->
shutdown(shutdown_count(frame_error, Reason), ?CONNACK_PACKET(?RC_MALFORMED_PACKET), Channel);
handle_in(
{frame_error, #{cause := frame_too_large}}, Channel = #channel{conn_state = ConnState}
) when
ConnState =:= connected orelse ConnState =:= reauthenticating
->
handle_out(disconnect, {?RC_PACKET_TOO_LARGE, frame_too_large}, Channel);
handle_in({frame_error, Reason}, Channel = #channel{conn_state = ConnState}) when
ConnState =:= connected orelse ConnState =:= reauthenticating
->
handle_out(disconnect, {?RC_MALFORMED_PACKET, Reason}, Channel);
handle_in({frame_error, Reason}, Channel = #channel{conn_state = disconnected}) ->
?SLOG(error, #{msg => "malformed_mqtt_message", reason => Reason}),
{ok, Channel};
handle_in({frame_error, Reason}, Channel) ->
handle_frame_error(Reason, Channel);
handle_in(Packet, Channel) ->
?SLOG(error, #{msg => "disconnecting_due_to_unexpected_message", packet => Packet}),
handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel).
@ -1021,6 +1002,68 @@ not_nacked({deliver, _Topic, Msg}) ->
true
end.
%%--------------------------------------------------------------------
%% Handle Frame Error
%%--------------------------------------------------------------------
handle_frame_error(
Reason = #{cause := frame_too_large},
Channel = #channel{conn_state = ConnState, conninfo = ConnInfo}
) when
?IS_CONNECTED_OR_REAUTHENTICATING(ConnState)
->
ShutdownCount = shutdown_count(frame_error, Reason),
case proto_ver(Reason, ConnInfo) of
?MQTT_PROTO_V5 ->
handle_out(disconnect, {?RC_PACKET_TOO_LARGE, frame_too_large}, Channel);
_ ->
shutdown(ShutdownCount, Channel)
end;
%% Only send CONNACK with reason code `frame_too_large` for MQTT-v5.0 when connecting,
%% otherwise DONOT send any CONNACK or DISCONNECT packet.
handle_frame_error(
Reason,
Channel = #channel{conn_state = ConnState, conninfo = ConnInfo}
) when
is_map(Reason) andalso
(ConnState == idle orelse ConnState == connecting)
->
ShutdownCount = shutdown_count(frame_error, Reason),
ProtoVer = proto_ver(Reason, ConnInfo),
NChannel = Channel#channel{conninfo = ConnInfo#{proto_ver => ProtoVer}},
case ProtoVer of
?MQTT_PROTO_V5 ->
shutdown(ShutdownCount, ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE), NChannel);
_ ->
shutdown(ShutdownCount, NChannel)
end;
handle_frame_error(
Reason,
Channel = #channel{conn_state = connecting}
) ->
shutdown(
shutdown_count(frame_error, Reason),
?CONNACK_PACKET(?RC_MALFORMED_PACKET),
Channel
);
handle_frame_error(
Reason,
Channel = #channel{conn_state = ConnState}
) when
?IS_CONNECTED_OR_REAUTHENTICATING(ConnState)
->
handle_out(
disconnect,
{?RC_MALFORMED_PACKET, Reason},
Channel
);
handle_frame_error(
Reason,
Channel = #channel{conn_state = disconnected}
) ->
?SLOG(error, #{msg => "malformed_mqtt_message", reason => Reason}),
{ok, Channel}.
%%--------------------------------------------------------------------
%% Handle outgoing packet
%%--------------------------------------------------------------------
@ -1289,7 +1332,7 @@ handle_info(
session = Session
}
) when
ConnState =:= connected orelse ConnState =:= reauthenticating
?IS_CONNECTED_OR_REAUTHENTICATING(ConnState)
->
{Intent, Session1} = session_disconnect(ClientInfo, ConnInfo, Session),
Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(sock_closed, Channel)),
@ -2636,8 +2679,7 @@ save_alias(outbound, AliasId, Topic, TopicAliases = #{outbound := Aliases}) ->
NAliases = maps:put(Topic, AliasId, Aliases),
TopicAliases#{outbound => NAliases}.
-compile({inline, [reply/2, shutdown/2, shutdown/3, sp/1, flag/1]}).
-compile({inline, [reply/2, shutdown/2, shutdown/3]}).
reply(Reply, Channel) ->
{reply, Reply, Channel}.
@ -2673,13 +2715,13 @@ disconnect_and_shutdown(
?IS_MQTT_V5 =
#channel{conn_state = ConnState}
) when
ConnState =:= connected orelse ConnState =:= reauthenticating
?IS_CONNECTED_OR_REAUTHENTICATING(ConnState)
->
NChannel = ensure_disconnected(Reason, Channel),
shutdown(Reason, Reply, ?DISCONNECT_PACKET(reason_code(Reason)), NChannel);
%% mqtt v3/v4 connected sessions
disconnect_and_shutdown(Reason, Reply, Channel = #channel{conn_state = ConnState}) when
ConnState =:= connected orelse ConnState =:= reauthenticating
?IS_CONNECTED_OR_REAUTHENTICATING(ConnState)
->
NChannel = ensure_disconnected(Reason, Channel),
shutdown(Reason, Reply, NChannel);
@ -2722,6 +2764,13 @@ is_durable_session(#channel{session = Session}) ->
false
end.
proto_ver(#{proto_ver := ProtoVer}, _ConnInfo) ->
ProtoVer;
proto_ver(_Reason, #{proto_ver := ProtoVer}) ->
ProtoVer;
proto_ver(_, _) ->
?MQTT_PROTO_V4.
%%--------------------------------------------------------------------
%% For CT tests
%%--------------------------------------------------------------------

View File

@ -783,7 +783,8 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
input_bytes => Data,
parsed_packets => Packets
}),
{[{frame_error, Reason} | Packets], State};
NState = enrich_state(Reason, State),
{[{frame_error, Reason} | Packets], NState};
error:Reason:Stacktrace ->
?LOG(error, #{
at_state => emqx_frame:describe_state(ParseState),
@ -1227,6 +1228,12 @@ inc_counter(Key, Inc) ->
_ = emqx_pd:inc_counter(Key, Inc),
ok.
enrich_state(#{parse_state := NParseState}, State) ->
Serialize = emqx_frame:serialize_opts(NParseState),
State#state{parse_state = NParseState, serialize = Serialize};
enrich_state(_, State) ->
State.
set_tcp_keepalive({quic, _Listener}) ->
ok;
set_tcp_keepalive({Type, Id}) ->

View File

@ -117,6 +117,13 @@ try_subscribe(ClientId, Topic) ->
write
),
allow;
[#exclusive_subscription{clientid = ClientId, topic = Topic}] ->
%% Fixed the issue-13476
%% In this feature, the user must manually call `unsubscribe` to release the lock,
%% but sometimes the node may go down for some reason,
%% then the client will reconnect to this node and resubscribe.
%% We need to allow resubscription, otherwise the lock will never be released.
allow;
[_] ->
deny
end.

View File

@ -43,7 +43,9 @@
add_shared_route/2,
delete_shared_route/2,
add_persistent_route/2,
delete_persistent_route/2
delete_persistent_route/2,
add_persistent_shared_route/3,
delete_persistent_shared_route/3
]).
-export_type([dest/0]).
@ -129,6 +131,12 @@ add_persistent_route(Topic, ID) ->
delete_persistent_route(Topic, ID) ->
?safe_with_provider(?FUNCTION_NAME(Topic, ID), ok).
add_persistent_shared_route(Topic, Group, ID) ->
?safe_with_provider(?FUNCTION_NAME(Topic, Group, ID), ok).
delete_persistent_shared_route(Topic, Group, ID) ->
?safe_with_provider(?FUNCTION_NAME(Topic, Group, ID), ok).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------

View File

@ -267,28 +267,50 @@ packet(Header, Variable) ->
packet(Header, Variable, Payload) ->
#mqtt_packet{header = Header, variable = Variable, payload = Payload}.
parse_connect(FrameBin, StrictMode) ->
{ProtoName, Rest} = parse_utf8_string_with_cause(FrameBin, StrictMode, invalid_proto_name),
case ProtoName of
<<"MQTT">> ->
ok;
<<"MQIsdp">> ->
ok;
_ ->
%% from spec: the server MAY send disconnect with reason code 0x84
%% we chose to close socket because the client is likely not talking MQTT anyway
?PARSE_ERR(#{
cause => invalid_proto_name,
expected => <<"'MQTT' or 'MQIsdp'">>,
received => ProtoName
})
end,
parse_connect2(ProtoName, Rest, StrictMode).
parse_connect(FrameBin, Options = #{strict_mode := StrictMode}) ->
{ProtoName, Rest0} = parse_utf8_string_with_cause(FrameBin, StrictMode, invalid_proto_name),
%% No need to parse and check proto_ver if proto_name is invalid, check it first
%% And the matching check of `proto_name` and `proto_ver` fields will be done in `emqx_packet:check_proto_ver/2`
_ = validate_proto_name(ProtoName),
{IsBridge, ProtoVer, Rest2} = parse_connect_proto_ver(Rest0),
NOptions = Options#{version => ProtoVer},
try
do_parse_connect(ProtoName, IsBridge, ProtoVer, Rest2, StrictMode)
catch
throw:{?FRAME_PARSE_ERROR, ReasonM} when is_map(ReasonM) ->
?PARSE_ERR(
ReasonM#{
proto_ver => ProtoVer,
proto_name => ProtoName,
parse_state => ?NONE(NOptions)
}
);
throw:{?FRAME_PARSE_ERROR, Reason} ->
?PARSE_ERR(
#{
cause => Reason,
proto_ver => ProtoVer,
proto_name => ProtoName,
parse_state => ?NONE(NOptions)
}
)
end.
parse_connect2(
do_parse_connect(
ProtoName,
<<BridgeTag:4, ProtoVer:4, UsernameFlagB:1, PasswordFlagB:1, WillRetainB:1, WillQoS:2,
WillFlagB:1, CleanStart:1, Reserved:1, KeepAlive:16/big, Rest2/binary>>,
IsBridge,
ProtoVer,
<<
UsernameFlagB:1,
PasswordFlagB:1,
WillRetainB:1,
WillQoS:2,
WillFlagB:1,
CleanStart:1,
Reserved:1,
KeepAlive:16/big,
Rest/binary
>>,
StrictMode
) ->
_ = validate_connect_reserved(Reserved),
@ -303,14 +325,14 @@ parse_connect2(
UsernameFlag = bool(UsernameFlagB),
PasswordFlag = bool(PasswordFlagB)
),
{Properties, Rest3} = parse_properties(Rest2, ProtoVer, StrictMode),
{Properties, Rest3} = parse_properties(Rest, ProtoVer, StrictMode),
{ClientId, Rest4} = parse_utf8_string_with_cause(Rest3, StrictMode, invalid_clientid),
ConnPacket = #mqtt_packet_connect{
proto_name = ProtoName,
proto_ver = ProtoVer,
%% For bridge mode, non-standard implementation
%% Invented by mosquitto, named 'try_private': https://mosquitto.org/man/mosquitto-conf-5.html
is_bridge = (BridgeTag =:= 8),
is_bridge = IsBridge,
clean_start = bool(CleanStart),
will_flag = WillFlag,
will_qos = WillQoS,
@ -343,16 +365,16 @@ parse_connect2(
unexpected_trailing_bytes => size(Rest7)
})
end;
parse_connect2(_ProtoName, Bin, _StrictMode) ->
%% sent less than 32 bytes
do_parse_connect(_ProtoName, _IsBridge, _ProtoVer, Bin, _StrictMode) ->
%% sent less than 24 bytes
?PARSE_ERR(#{cause => malformed_connect, header_bytes => Bin}).
parse_packet(
#mqtt_packet_header{type = ?CONNECT},
FrameBin,
#{strict_mode := StrictMode}
Options
) ->
parse_connect(FrameBin, StrictMode);
parse_connect(FrameBin, Options);
parse_packet(
#mqtt_packet_header{type = ?CONNACK},
<<AckFlags:8, ReasonCode:8, Rest/binary>>,
@ -516,6 +538,12 @@ parse_packet_id(<<PacketId:16/big, Rest/binary>>) ->
parse_packet_id(_) ->
?PARSE_ERR(invalid_packet_id).
parse_connect_proto_ver(<<BridgeTag:4, ProtoVer:4, Rest/binary>>) ->
{_IsBridge = (BridgeTag =:= 8), ProtoVer, Rest};
parse_connect_proto_ver(Bin) ->
%% sent less than 1 bytes or empty
?PARSE_ERR(#{cause => malformed_connect, header_bytes => Bin}).
parse_properties(Bin, Ver, _StrictMode) when Ver =/= ?MQTT_PROTO_V5 ->
{#{}, Bin};
%% TODO: version mess?
@ -739,6 +767,8 @@ serialize_fun(#{version := Ver, max_size := MaxSize, strict_mode := StrictMode})
initial_serialize_opts(Opts) ->
maps:merge(?DEFAULT_OPTIONS, Opts).
serialize_opts(?NONE(Options)) ->
maps:merge(?DEFAULT_OPTIONS, Options);
serialize_opts(#mqtt_packet_connect{proto_ver = ProtoVer, properties = ConnProps}) ->
MaxSize = get_property('Maximum-Packet-Size', ConnProps, ?MAX_PACKET_SIZE),
#{version => ProtoVer, max_size => MaxSize, strict_mode => false}.
@ -1157,18 +1187,34 @@ validate_subqos([3 | _]) -> ?PARSE_ERR(bad_subqos);
validate_subqos([_ | T]) -> validate_subqos(T);
validate_subqos([]) -> ok.
%% from spec: the server MAY send disconnect with reason code 0x84
%% we chose to close socket because the client is likely not talking MQTT anyway
validate_proto_name(<<"MQTT">>) ->
ok;
validate_proto_name(<<"MQIsdp">>) ->
ok;
validate_proto_name(ProtoName) ->
?PARSE_ERR(#{
cause => invalid_proto_name,
expected => <<"'MQTT' or 'MQIsdp'">>,
received => ProtoName
}).
%% MQTT-v3.1.1-[MQTT-3.1.2-3], MQTT-v5.0-[MQTT-3.1.2-3]
-compile({inline, [validate_connect_reserved/1]}).
validate_connect_reserved(0) -> ok;
validate_connect_reserved(1) -> ?PARSE_ERR(reserved_connect_flag).
-compile({inline, [validate_connect_will/3]}).
%% MQTT-v3.1.1-[MQTT-3.1.2-13], MQTT-v5.0-[MQTT-3.1.2-11]
validate_connect_will(false, _, WillQos) when WillQos > 0 -> ?PARSE_ERR(invalid_will_qos);
validate_connect_will(false, _, WillQoS) when WillQoS > 0 -> ?PARSE_ERR(invalid_will_qos);
%% MQTT-v3.1.1-[MQTT-3.1.2-14], MQTT-v5.0-[MQTT-3.1.2-12]
validate_connect_will(true, _, WillQoS) when WillQoS > 2 -> ?PARSE_ERR(invalid_will_qos);
%% MQTT-v3.1.1-[MQTT-3.1.2-15], MQTT-v5.0-[MQTT-3.1.2-13]
validate_connect_will(false, WillRetain, _) when WillRetain -> ?PARSE_ERR(invalid_will_retain);
validate_connect_will(_, _, _) -> ok.
-compile({inline, [validate_connect_password_flag/4]}).
%% MQTT-v3.1
%% Username flag and password flag are not strongly related
%% https://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#connect
@ -1183,6 +1229,7 @@ validate_connect_password_flag(true, ?MQTT_PROTO_V5, _, _) ->
validate_connect_password_flag(_, _, _, _) ->
ok.
-compile({inline, [bool/1]}).
bool(0) -> false;
bool(1) -> true.

View File

@ -432,7 +432,7 @@ do_start_listener(Type, Name, Id, #{bind := ListenOn} = Opts) when ?ESOCKD_LISTE
esockd:open(
Id,
ListenOn,
merge_default(esockd_opts(Id, Type, Name, Opts))
merge_default(esockd_opts(Id, Type, Name, Opts, _OldOpts = undefined))
);
%% Start MQTT/WS listener
do_start_listener(Type, Name, Id, Opts) when ?COWBOY_LISTENER(Type) ->
@ -476,7 +476,7 @@ do_update_listener(Type, Name, OldConf, NewConf = #{bind := ListenOn}) when
Id = listener_id(Type, Name),
case maps:get(bind, OldConf) of
ListenOn ->
esockd:set_options({Id, ListenOn}, esockd_opts(Id, Type, Name, NewConf));
esockd:set_options({Id, ListenOn}, esockd_opts(Id, Type, Name, NewConf, OldConf));
_Different ->
%% TODO
%% Again, we're not strictly required to drop live connections in this case.
@ -588,7 +588,7 @@ perform_listener_change(update, {{Type, Name, ConfOld}, {_, _, ConfNew}}) ->
perform_listener_change(stop, {Type, Name, Conf}) ->
stop_listener(Type, Name, Conf).
esockd_opts(ListenerId, Type, Name, Opts0) ->
esockd_opts(ListenerId, Type, Name, Opts0, OldOpts) ->
Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0),
Limiter = limiter(Opts0),
Opts2 =
@ -620,7 +620,7 @@ esockd_opts(ListenerId, Type, Name, Opts0) ->
tcp ->
Opts3#{tcp_options => tcp_opts(Opts0)};
ssl ->
OptsWithCRL = inject_crl_config(Opts0),
OptsWithCRL = inject_crl_config(Opts0, OldOpts),
OptsWithSNI = inject_sni_fun(ListenerId, OptsWithCRL),
OptsWithRootFun = inject_root_fun(OptsWithSNI),
OptsWithVerifyFun = inject_verify_fun(OptsWithRootFun),
@ -996,7 +996,7 @@ inject_sni_fun(_ListenerId, Conf) ->
Conf.
inject_crl_config(
Conf = #{ssl_options := #{enable_crl_check := true} = SSLOpts}
Conf = #{ssl_options := #{enable_crl_check := true} = SSLOpts}, _OldOpts
) ->
HTTPTimeout = emqx_config:get([crl_cache, http_timeout], timer:seconds(15)),
Conf#{
@ -1006,7 +1006,16 @@ inject_crl_config(
crl_cache => {emqx_ssl_crl_cache, {internal, [{http, HTTPTimeout}]}}
}
};
inject_crl_config(Conf) ->
inject_crl_config(#{ssl_options := SSLOpts0} = Conf0, #{} = OldOpts) ->
%% Note: we must set crl options to `undefined' to unset them. Otherwise,
%% `esockd' will retain such options when `esockd:merge_opts/2' is called and the SSL
%% options were previously enabled.
WasEnabled = emqx_utils_maps:deep_get([ssl_options, enable_crl_check], OldOpts, false),
Undefine = fun(Acc, K) -> emqx_utils_maps:put_if(Acc, K, undefined, WasEnabled) end,
SSLOpts1 = Undefine(SSLOpts0, crl_check),
SSLOpts = Undefine(SSLOpts1, crl_cache),
Conf0#{ssl_options := SSLOpts};
inject_crl_config(Conf, undefined = _OldOpts) ->
Conf.
maybe_unregister_ocsp_stapling_refresh(

View File

@ -25,7 +25,7 @@
-export([start_link/0]).
%% throttler API
-export([allow/1]).
-export([allow/2]).
%% gen_server callbacks
-export([
@ -40,23 +40,29 @@
-define(SEQ_ID(Msg), {?MODULE, Msg}).
-define(NEW_SEQ, atomics:new(1, [{signed, false}])).
-define(GET_SEQ(Msg), persistent_term:get(?SEQ_ID(Msg), undefined)).
-define(ERASE_SEQ(Msg), persistent_term:erase(?SEQ_ID(Msg))).
-define(RESET_SEQ(SeqRef), atomics:put(SeqRef, 1, 0)).
-define(INC_SEQ(SeqRef), atomics:add(SeqRef, 1, 1)).
-define(GET_DROPPED(SeqRef), atomics:get(SeqRef, 1) - 1).
-define(IS_ALLOWED(SeqRef), atomics:add_get(SeqRef, 1, 1) =:= 1).
-define(NEW_THROTTLE(Msg, SeqRef), persistent_term:put(?SEQ_ID(Msg), SeqRef)).
-define(MSGS_LIST, emqx:get_config([log, throttling, msgs], [])).
-define(TIME_WINDOW_MS, timer:seconds(emqx:get_config([log, throttling, time_window], 60))).
-spec allow(atom()) -> boolean().
allow(Msg) when is_atom(Msg) ->
%% @doc Check if a throttled log message is allowed to pass down to the logger this time.
%% The Msg has to be an atom, and the second argument `UniqueKey' should be `undefined'
%% for predefined message IDs.
%% For relatively static resources created from configurations such as data integration
%% resource IDs `UniqueKey' should be of `binary()' type.
-spec allow(atom(), undefined | binary()) -> boolean().
allow(Msg, UniqueKey) when
is_atom(Msg) andalso (is_binary(UniqueKey) orelse UniqueKey =:= undefined)
->
case emqx_logger:get_primary_log_level() of
debug ->
true;
_ ->
do_allow(Msg)
do_allow(Msg, UniqueKey)
end.
-spec start_link() -> startlink_ret().
@ -68,7 +74,8 @@ start_link() ->
%%--------------------------------------------------------------------
init([]) ->
ok = lists:foreach(fun(Msg) -> ?NEW_THROTTLE(Msg, ?NEW_SEQ) end, ?MSGS_LIST),
process_flag(trap_exit, true),
ok = lists:foreach(fun new_throttler/1, ?MSGS_LIST),
CurrentPeriodMs = ?TIME_WINDOW_MS,
TimerRef = schedule_refresh(CurrentPeriodMs),
{ok, #{timer_ref => TimerRef, current_period_ms => CurrentPeriodMs}}.
@ -86,16 +93,22 @@ handle_info(refresh, #{current_period_ms := PeriodMs} = State) ->
DroppedStats = lists:foldl(
fun(Msg, Acc) ->
case ?GET_SEQ(Msg) of
%% Should not happen, unless the static ids list is updated at run-time.
undefined ->
?NEW_THROTTLE(Msg, ?NEW_SEQ),
%% Should not happen, unless the static ids list is updated at run-time.
new_throttler(Msg),
?tp(log_throttler_new_msg, #{throttled_msg => Msg}),
Acc;
SeqMap when is_map(SeqMap) ->
maps:fold(
fun(Key, Ref, Acc0) ->
ID = iolist_to_binary([atom_to_binary(Msg), $:, Key]),
drop_stats(Ref, ID, Acc0)
end,
Acc,
SeqMap
);
SeqRef ->
Dropped = ?GET_DROPPED(SeqRef),
ok = ?RESET_SEQ(SeqRef),
?tp(log_throttler_dropped, #{dropped_count => Dropped, throttled_msg => Msg}),
maybe_add_dropped(Msg, Dropped, Acc)
drop_stats(SeqRef, Msg, Acc)
end
end,
#{},
@ -112,7 +125,16 @@ handle_info(Info, State) ->
?SLOG(error, #{msg => "unxpected_info", info => Info}),
{noreply, State}.
drop_stats(SeqRef, Msg, Acc) ->
Dropped = ?GET_DROPPED(SeqRef),
ok = ?RESET_SEQ(SeqRef),
?tp(log_throttler_dropped, #{dropped_count => Dropped, throttled_msg => Msg}),
maybe_add_dropped(Msg, Dropped, Acc).
terminate(_Reason, _State) ->
%% atomics do not have delete/remove/release/deallocate API
%% after the reference is garbage-collected the resource is released
lists:foreach(fun(Msg) -> ?ERASE_SEQ(Msg) end, ?MSGS_LIST),
ok.
code_change(_OldVsn, State, _Extra) ->
@ -122,17 +144,27 @@ code_change(_OldVsn, State, _Extra) ->
%% internal functions
%%--------------------------------------------------------------------
do_allow(Msg) ->
do_allow(Msg, UniqueKey) ->
case persistent_term:get(?SEQ_ID(Msg), undefined) of
undefined ->
%% This is either a race condition (emqx_log_throttler is not started yet)
%% or a developer mistake (msg used in ?SLOG_THROTTLE/2,3 macro is
%% not added to the default value of `log.throttling.msgs`.
?SLOG(info, #{
msg => "missing_log_throttle_sequence",
?SLOG(debug, #{
msg => "log_throttle_disabled",
throttled_msg => Msg
}),
true;
%% e.g: unrecoverable msg throttle according resource_id
SeqMap when is_map(SeqMap) ->
case maps:find(UniqueKey, SeqMap) of
{ok, SeqRef} ->
?IS_ALLOWED(SeqRef);
error ->
SeqRef = ?NEW_SEQ,
new_throttler(Msg, SeqMap#{UniqueKey => SeqRef}),
true
end;
SeqRef ->
?IS_ALLOWED(SeqRef)
end.
@ -154,3 +186,11 @@ maybe_log_dropped(_DroppedStats, _PeriodMs) ->
schedule_refresh(PeriodMs) ->
?tp(log_throttler_sched_refresh, #{new_period_ms => PeriodMs}),
erlang:send_after(PeriodMs, ?MODULE, refresh).
new_throttler(unrecoverable_resource_error = Msg) ->
new_throttler(Msg, #{});
new_throttler(Msg) ->
new_throttler(Msg, ?NEW_SEQ).
new_throttler(Msg, AtomicOrEmptyMap) ->
persistent_term:put(?SEQ_ID(Msg), AtomicOrEmptyMap).

View File

@ -105,7 +105,7 @@ format(Msg, Meta, Config) ->
maybe_format_msg(undefined, _Meta, _Config) ->
#{};
maybe_format_msg({report, Report0} = Msg, #{report_cb := Cb} = Meta, Config) ->
Report = emqx_logger_textfmt:try_encode_payload(Report0, Config),
Report = emqx_logger_textfmt:try_encode_meta(Report0, Config),
case is_map(Report) andalso Cb =:= ?DEFAULT_FORMATTER of
true ->
%% reporting a map without a customised format function

View File

@ -20,7 +20,7 @@
-export([format/2]).
-export([check_config/1]).
-export([try_format_unicode/1, try_encode_payload/2]).
-export([try_format_unicode/1, try_encode_meta/2]).
%% Used in the other log formatters
-export([evaluate_lazy_values_if_dbg_level/1, evaluate_lazy_values/1]).
@ -111,7 +111,7 @@ is_list_report_acceptable(_) ->
enrich_report(ReportRaw0, Meta, Config) ->
%% clientid and peername always in emqx_conn's process metadata.
%% topic and username can be put in meta using ?SLOG/3, or put in msg's report by ?SLOG/2
ReportRaw = try_encode_payload(ReportRaw0, Config),
ReportRaw = try_encode_meta(ReportRaw0, Config),
Topic =
case maps:get(topic, Meta, undefined) of
undefined -> maps:get(topic, ReportRaw, undefined);
@ -180,9 +180,22 @@ enrich_topic({Fmt, Args}, #{topic := Topic}) when is_list(Fmt) ->
enrich_topic(Msg, _) ->
Msg.
try_encode_payload(#{payload := Payload} = Report, #{payload_encode := Encode}) ->
try_encode_meta(Report, Config) ->
lists:foldl(
fun(Meta, Acc) ->
try_encode_meta(Meta, Acc, Config)
end,
Report,
[payload, packet]
).
try_encode_meta(payload, #{payload := Payload} = Report, #{payload_encode := Encode}) ->
Report#{payload := encode_payload(Payload, Encode)};
try_encode_payload(Report, _Config) ->
try_encode_meta(packet, #{packet := Packet} = Report, #{payload_encode := Encode}) when
is_tuple(Packet)
->
Report#{packet := emqx_packet:format(Packet, Encode)};
try_encode_meta(_, Report, _Config) ->
Report.
encode_payload(Payload, text) ->
@ -190,4 +203,5 @@ encode_payload(Payload, text) ->
encode_payload(_Payload, hidden) ->
"******";
encode_payload(Payload, hex) ->
binary:encode_hex(Payload).
Bin = emqx_utils_conv:bin(Payload),
binary:encode_hex(Bin).

View File

@ -51,7 +51,6 @@
]).
-export([
format/1,
format/2
]).
@ -481,10 +480,6 @@ will_msg(#mqtt_packet_connect{
headers = #{username => Username, properties => Props}
}.
%% @doc Format packet
-spec format(emqx_types:packet()) -> iolist().
format(Packet) -> format(Packet, emqx_trace_handler:payload_encode()).
%% @doc Format packet
-spec format(emqx_types:packet(), hex | text | hidden) -> iolist().
format(#mqtt_packet{header = Header, variable = Variable, payload = Payload}, PayloadEncode) ->

View File

@ -621,9 +621,13 @@ handle_timeout(ClientInfo, ?TIMER_RETRY_REPLAY, Session0) ->
Session = replay_streams(Session0, ClientInfo),
{ok, [], Session};
handle_timeout(ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0, shared_sub_s := SharedSubS0}) ->
S1 = emqx_persistent_session_ds_subs:gc(S0),
S2 = emqx_persistent_session_ds_stream_scheduler:renew_streams(S1),
{S, SharedSubS} = emqx_persistent_session_ds_shared_subs:renew_streams(S2, SharedSubS0),
%% `gc` and `renew_streams` methods may drop unsubscribed streams.
%% Shared subscription handler must have a chance to see unsubscribed streams
%% in the fully replayed state.
{S1, SharedSubS1} = emqx_persistent_session_ds_shared_subs:pre_renew_streams(S0, SharedSubS0),
S2 = emqx_persistent_session_ds_subs:gc(S1),
S3 = emqx_persistent_session_ds_stream_scheduler:renew_streams(S2),
{S, SharedSubS} = emqx_persistent_session_ds_shared_subs:renew_streams(S3, SharedSubS1),
Interval = get_config(ClientInfo, [renew_streams_interval]),
Session = emqx_session:ensure_timer(
?TIMER_GET_STREAMS,
@ -757,7 +761,7 @@ skip_batch(StreamKey, SRS0, Session = #{s := S0}, ClientInfo, Reason) ->
%%--------------------------------------------------------------------
-spec disconnect(session(), emqx_types:conninfo()) -> {shutdown, session()}.
disconnect(Session = #{id := Id, s := S0}, ConnInfo) ->
disconnect(Session = #{id := Id, s := S0, shared_sub_s := SharedSubS0}, ConnInfo) ->
S1 = maybe_set_offline_info(S0, Id),
S2 = emqx_persistent_session_ds_state:set_last_alive_at(now_ms(), S1),
S3 =
@ -767,8 +771,9 @@ disconnect(Session = #{id := Id, s := S0}, ConnInfo) ->
_ ->
S2
end,
S = emqx_persistent_session_ds_state:commit(S3),
{shutdown, Session#{s => S}}.
{S4, SharedSubS} = emqx_persistent_session_ds_shared_subs:on_disconnect(S3, SharedSubS0),
S = emqx_persistent_session_ds_state:commit(S4),
{shutdown, Session#{s => S, shared_sub_s => SharedSubS}}.
-spec terminate(Reason :: term(), session()) -> ok.
terminate(_Reason, Session = #{id := Id, s := S}) ->
@ -816,10 +821,12 @@ list_client_subscriptions(ClientId) ->
{error, not_found}
end.
-spec get_client_subscription(emqx_types:clientid(), emqx_types:topic()) ->
-spec get_client_subscription(emqx_types:clientid(), topic_filter() | share_topic_filter()) ->
subscription() | undefined.
get_client_subscription(ClientId, Topic) ->
emqx_persistent_session_ds_subs:cold_get_subscription(ClientId, Topic).
get_client_subscription(ClientId, #share{} = ShareTopicFilter) ->
emqx_persistent_session_ds_shared_subs:cold_get_subscription(ClientId, ShareTopicFilter);
get_client_subscription(ClientId, TopicFilter) ->
emqx_persistent_session_ds_subs:cold_get_subscription(ClientId, TopicFilter).
%%--------------------------------------------------------------------
%% Session tables operations
@ -986,14 +993,14 @@ do_ensure_all_iterators_closed(_DSSessionID) ->
%% Normal replay:
%%--------------------------------------------------------------------
fetch_new_messages(Session0 = #{s := S0}, ClientInfo) ->
LFS = maps:get(last_fetched_stream, Session0, beginning),
ItStream = emqx_persistent_session_ds_stream_scheduler:iter_next_streams(LFS, S0),
fetch_new_messages(Session0 = #{s := S0, shared_sub_s := SharedSubS0}, ClientInfo) ->
{S1, SharedSubS1} = emqx_persistent_session_ds_shared_subs:on_streams_replay(S0, SharedSubS0),
Session1 = Session0#{s => S1, shared_sub_s => SharedSubS1},
LFS = maps:get(last_fetched_stream, Session1, beginning),
ItStream = emqx_persistent_session_ds_stream_scheduler:iter_next_streams(LFS, S1),
BatchSize = get_config(ClientInfo, [batch_size]),
Session1 = fetch_new_messages(ItStream, BatchSize, Session0, ClientInfo),
#{s := S1, shared_sub_s := SharedSubS0} = Session1,
{S2, SharedSubS1} = emqx_persistent_session_ds_shared_subs:on_streams_replayed(S1, SharedSubS0),
Session1#{s => S2, shared_sub_s => SharedSubS1}.
Session2 = fetch_new_messages(ItStream, BatchSize, Session1, ClientInfo),
Session2#{shared_sub_s => SharedSubS1}.
fetch_new_messages(ItStream0, BatchSize, Session0, ClientInfo) ->
#{inflight := Inflight} = Session0,

View File

@ -17,7 +17,7 @@
-module(emqx_persistent_session_ds_router).
-include("emqx.hrl").
-include("emqx_persistent_session_ds/emqx_ps_ds_int.hrl").
-include("emqx_ps_ds_int.hrl").
-export([init_tables/0]).
@ -47,7 +47,7 @@
-endif.
-type route() :: #ps_route{}.
-type dest() :: emqx_persistent_session_ds:id().
-type dest() :: emqx_persistent_session_ds:id() | #share_dest{}.
-export_type([dest/0, route/0]).
@ -161,7 +161,7 @@ topics() ->
print_routes(Topic) ->
lists:foreach(
fun(#ps_route{topic = To, dest = Dest}) ->
io:format("~ts -> ~ts~n", [To, Dest])
io:format("~ts -> ~tp~n", [To, Dest])
end,
match_routes(Topic)
).
@ -247,6 +247,8 @@ mk_filtertab_fold_fun(FoldFun) ->
match_filters(Topic) ->
emqx_topic_index:matches(Topic, ?PS_FILTERS_TAB, []).
get_dest_session_id(#share_dest{session_id = DSSessionId}) ->
DSSessionId;
get_dest_session_id({_, DSSessionId}) ->
DSSessionId;
get_dest_session_id(DSSessionId) ->

View File

@ -2,11 +2,37 @@
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
%% @doc This module
%% * handles creation and management of _shared_ subscriptions for the session;
%% * provides streams to the session;
%% * handles progress of stream replay.
%%
%% The logic is quite straightforward; most of the parts resemble the logic of the
%% `emqx_persistent_session_ds_subs` (subscribe/unsubscribe) and
%% `emqx_persistent_session_ds_scheduler` (providing new streams),
%% but some data is sent or received from the `emqx_persistent_session_ds_shared_subs_agent`
%% which communicates with remote shared subscription leaders.
%%
%% A tricky part is the concept of "scheduled actions". When we unsubscribe from a topic
%% we may have some streams that have unacked messages. So we do not have a reliable
%% progress for them. Sending the current progress to the leader and disconnecting
%% will lead to the duplication of messages. So after unsubscription, we need to wait
%% some time until all streams are acked, and only then we disconnect from the leader.
%%
%% For this purpose we have the `scheduled_actions` map in the state of the module.
%% We preserve there the streams that we need to wait for and collect their progress.
%% We also use `scheduled_actions` for resubscriptions. If a client quickly resubscribes
%% after unsubscription, we may still have the mentioned streams unacked. If we abandon
%% them, just connect to the leader, then it may lease us the same streams again, but with
%% the previous progress. So messages may duplicate.
-module(emqx_persistent_session_ds_shared_subs).
-include("emqx_mqtt.hrl").
-include("emqx.hrl").
-include("logger.hrl").
-include("session_internals.hrl").
-include_lib("snabbkaffe/include/trace.hrl").
-export([
@ -15,16 +41,56 @@
on_subscribe/3,
on_unsubscribe/4,
on_disconnect/2,
on_streams_replayed/2,
on_streams_replay/2,
on_info/3,
pre_renew_streams/2,
renew_streams/2,
to_map/2
]).
%% Management API:
-export([
cold_get_subscription/2
]).
-export([
format_lease_events/1,
format_stream_progresses/1
]).
-define(schedule_subscribe, schedule_subscribe).
-define(schedule_unsubscribe, schedule_unsubscribe).
-type stream_key() :: {emqx_persistent_session_ds:id(), emqx_ds:stream()}.
-type scheduled_action_type() ::
{?schedule_subscribe, emqx_types:subopts()} | ?schedule_unsubscribe.
-type agent_stream_progress() :: #{
stream := emqx_ds:stream(),
progress := progress(),
use_finished := boolean()
}.
-type progress() ::
#{
iterator := emqx_ds:iterator()
}.
-type scheduled_action() :: #{
type := scheduled_action_type(),
stream_keys_to_wait := [stream_key()],
progresses := [agent_stream_progress()]
}.
-type t() :: #{
agent := emqx_persistent_session_ds_shared_subs_agent:t()
agent := emqx_persistent_session_ds_shared_subs_agent:t(),
scheduled_actions := #{
share_topic_filter() => scheduled_action()
}
}.
-type share_topic_filter() :: emqx_persistent_session_ds:share_topic_filter().
-type opts() :: #{
@ -34,184 +100,90 @@
-define(rank_x, rank_shared).
-define(rank_y, 0).
-export_type([
progress/0,
agent_stream_progress/0
]).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
%%--------------------------------------------------------------------
%% new
-spec new(opts()) -> t().
new(Opts) ->
#{
agent => emqx_persistent_session_ds_shared_subs_agent:new(
agent_opts(Opts)
)
),
scheduled_actions => #{}
}.
%%--------------------------------------------------------------------
%% open
-spec open(emqx_persistent_session_ds_state:t(), opts()) ->
{ok, emqx_persistent_session_ds_state:t(), t()}.
open(S, Opts) ->
open(S0, Opts) ->
SharedSubscriptions = fold_shared_subs(
fun(#share{} = TopicFilter, Sub, Acc) ->
[{TopicFilter, to_agent_subscription(S, Sub)} | Acc]
fun(#share{} = ShareTopicFilter, Sub, Acc) ->
[{ShareTopicFilter, to_agent_subscription(S0, Sub)} | Acc]
end,
[],
S
S0
),
Agent = emqx_persistent_session_ds_shared_subs_agent:open(
SharedSubscriptions, agent_opts(Opts)
),
SharedSubS = #{agent => Agent},
{ok, S, SharedSubS}.
SharedSubS = #{agent => Agent, scheduled_actions => #{}},
S1 = revoke_all_streams(S0),
{ok, S1, SharedSubS}.
%%--------------------------------------------------------------------
%% on_subscribe
-spec on_subscribe(
share_topic_filter(),
emqx_types:subopts(),
emqx_persistent_session_ds:session()
) -> {ok, emqx_persistent_session_ds_state:t(), t()} | {error, emqx_types:reason_code()}.
on_subscribe(TopicFilter, SubOpts, #{s := S} = Session) ->
Subscription = emqx_persistent_session_ds_state:get_subscription(TopicFilter, S),
on_subscribe(Subscription, TopicFilter, SubOpts, Session).
-spec on_unsubscribe(
emqx_persistent_session_ds:id(),
emqx_persistent_session_ds:topic_filter(),
emqx_persistent_session_ds_state:t(),
t()
) ->
{ok, emqx_persistent_session_ds_state:t(), t(), emqx_persistent_session_ds:subscription()}
| {error, emqx_types:reason_code()}.
on_unsubscribe(SessionId, TopicFilter, S0, #{agent := Agent0} = SharedSubS0) ->
case lookup(TopicFilter, S0) of
undefined ->
{error, ?RC_NO_SUBSCRIPTION_EXISTED};
Subscription ->
?tp(persistent_session_ds_subscription_delete, #{
session_id => SessionId, topic_filter => TopicFilter
}),
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_unsubscribe(
Agent0, TopicFilter
),
SharedSubS = SharedSubS0#{agent => Agent1},
S = emqx_persistent_session_ds_state:del_subscription(TopicFilter, S0),
{ok, S, SharedSubS, Subscription}
end.
-spec renew_streams(emqx_persistent_session_ds_state:t(), t()) ->
{emqx_persistent_session_ds_state:t(), t()}.
renew_streams(S0, #{agent := Agent0} = SharedSubS0) ->
{StreamLeaseEvents, Agent1} = emqx_persistent_session_ds_shared_subs_agent:renew_streams(
Agent0
),
?tp(info, shared_subs_new_stream_lease_events, #{stream_lease_events => StreamLeaseEvents}),
S1 = lists:foldl(
fun
(#{type := lease} = Event, S) -> accept_stream(Event, S);
(#{type := revoke} = Event, S) -> revoke_stream(Event, S)
end,
S0,
StreamLeaseEvents
),
SharedSubS1 = SharedSubS0#{agent => Agent1},
{S1, SharedSubS1}.
-spec on_streams_replayed(
emqx_persistent_session_ds_state:t(),
t()
) -> {emqx_persistent_session_ds_state:t(), t()}.
on_streams_replayed(S, #{agent := Agent0} = SharedSubS0) ->
%% TODO
%% Is it sufficient for a report?
Progress = fold_shared_stream_states(
fun(TopicFilter, Stream, SRS, Acc) ->
#srs{it_begin = BeginIt} = SRS,
StreamProgress = #{
topic_filter => TopicFilter,
stream => Stream,
iterator => BeginIt
},
[StreamProgress | Acc]
end,
[],
S
),
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_stream_progress(
Agent0, Progress
),
SharedSubS1 = SharedSubS0#{agent => Agent1},
{S, SharedSubS1}.
-spec on_info(emqx_persistent_session_ds_state:t(), t(), term()) ->
{emqx_persistent_session_ds_state:t(), t()}.
on_info(S, #{agent := Agent0} = SharedSubS0, Info) ->
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_info(Agent0, Info),
SharedSubS1 = SharedSubS0#{agent => Agent1},
{S, SharedSubS1}.
-spec to_map(emqx_persistent_session_ds_state:t(), t()) -> map().
to_map(_S, _SharedSubS) ->
%% TODO
#{}.
on_subscribe(#share{} = ShareTopicFilter, SubOpts, #{s := S} = Session) ->
Subscription = emqx_persistent_session_ds_state:get_subscription(ShareTopicFilter, S),
on_subscribe(Subscription, ShareTopicFilter, SubOpts, Session).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
%% on_subscribe internal functions
fold_shared_subs(Fun, Acc, S) ->
emqx_persistent_session_ds_state:fold_subscriptions(
fun
(#share{} = TopicFilter, Sub, Acc0) -> Fun(TopicFilter, Sub, Acc0);
(_, _Sub, Acc0) -> Acc0
end,
Acc,
S
).
fold_shared_stream_states(Fun, Acc, S) ->
%% TODO
%% Optimize or cache
TopicFilters = fold_shared_subs(
fun
(#share{} = TopicFilter, #{id := Id} = _Sub, Acc0) ->
Acc0#{Id => TopicFilter};
(_, _, Acc0) ->
Acc0
end,
#{},
S
),
emqx_persistent_session_ds_state:fold_streams(
fun({SubId, Stream}, SRS, Acc0) ->
case TopicFilters of
#{SubId := TopicFilter} ->
Fun(TopicFilter, Stream, SRS, Acc0);
_ ->
Acc0
end
end,
Acc,
S
).
on_subscribe(undefined, TopicFilter, SubOpts, #{props := Props, s := S} = Session) ->
on_subscribe(undefined, ShareTopicFilter, SubOpts, #{props := Props, s := S} = Session) ->
#{max_subscriptions := MaxSubscriptions} = Props,
case emqx_persistent_session_ds_state:n_subscriptions(S) < MaxSubscriptions of
true ->
create_new_subscription(TopicFilter, SubOpts, Session);
create_new_subscription(ShareTopicFilter, SubOpts, Session);
false ->
{error, ?RC_QUOTA_EXCEEDED}
end;
on_subscribe(Subscription, TopicFilter, SubOpts, Session) ->
update_subscription(Subscription, TopicFilter, SubOpts, Session).
on_subscribe(Subscription, ShareTopicFilter, SubOpts, Session) ->
update_subscription(Subscription, ShareTopicFilter, SubOpts, Session).
-dialyzer({nowarn_function, create_new_subscription/3}).
create_new_subscription(TopicFilter, SubOpts, #{
id := SessionId, s := S0, shared_sub_s := #{agent := Agent0} = SharedSubS0, props := Props
create_new_subscription(#share{topic = TopicFilter, group = Group} = ShareTopicFilter, SubOpts, #{
id := SessionId,
s := S0,
shared_sub_s := #{agent := Agent} = SharedSubS0,
props := Props
}) ->
case
emqx_persistent_session_ds_shared_subs_agent:on_subscribe(
Agent0, TopicFilter, SubOpts
emqx_persistent_session_ds_shared_subs_agent:can_subscribe(
Agent, ShareTopicFilter, SubOpts
)
of
{ok, Agent1} ->
ok ->
ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, #share_dest{
session_id = SessionId, group = Group
}),
_ = emqx_external_broker:add_persistent_shared_route(TopicFilter, Group, SessionId),
#{upgrade_qos := UpgradeQoS} = Props,
{SubId, S1} = emqx_persistent_session_ds_state:new_id(S0),
{SStateId, S2} = emqx_persistent_session_ds_state:new_id(S1),
@ -227,20 +199,20 @@ create_new_subscription(TopicFilter, SubOpts, #{
start_time => now_ms()
},
S = emqx_persistent_session_ds_state:put_subscription(
TopicFilter, Subscription, S3
ShareTopicFilter, Subscription, S3
),
SharedSubS = SharedSubS0#{agent => Agent1},
?tp(persistent_session_ds_shared_subscription_added, #{
topic_filter => TopicFilter, session => SessionId
}),
SharedSubS = schedule_subscribe(SharedSubS0, ShareTopicFilter, SubOpts),
{ok, S, SharedSubS};
{error, _} = Error ->
Error
end.
update_subscription(#{current_state := SStateId0, id := SubId} = Sub0, TopicFilter, SubOpts, #{
s := S0, shared_sub_s := SharedSubS, props := Props
}) ->
update_subscription(
#{current_state := SStateId0, id := SubId} = Sub0, ShareTopicFilter, SubOpts, #{
s := S0, shared_sub_s := SharedSubS, props := Props
}
) ->
#{upgrade_qos := UpgradeQoS} = Props,
SState = #{parent_subscription => SubId, upgrade_qos => UpgradeQoS, subopts => SubOpts},
case emqx_persistent_session_ds_state:get_subscription_state(SStateId0, S0) of
@ -254,36 +226,173 @@ update_subscription(#{current_state := SStateId0, id := SubId} = Sub0, TopicFilt
SStateId, SState, S1
),
Sub = Sub0#{current_state => SStateId},
S = emqx_persistent_session_ds_state:put_subscription(TopicFilter, Sub, S2),
S = emqx_persistent_session_ds_state:put_subscription(ShareTopicFilter, Sub, S2),
{ok, S, SharedSubS}
end.
lookup(TopicFilter, S) ->
case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S) of
Sub = #{current_state := SStateId} ->
case emqx_persistent_session_ds_state:get_subscription_state(SStateId, S) of
#{subopts := SubOpts} ->
Sub#{subopts => SubOpts};
undefined ->
undefined
end;
-dialyzer({nowarn_function, schedule_subscribe/3}).
schedule_subscribe(
#{agent := Agent0, scheduled_actions := ScheduledActions0} = SharedSubS0,
ShareTopicFilter,
SubOpts
) ->
case ScheduledActions0 of
#{ShareTopicFilter := ScheduledAction} ->
ScheduledActions1 = ScheduledActions0#{
ShareTopicFilter => ScheduledAction#{type => {?schedule_subscribe, SubOpts}}
},
?tp(debug, shared_subs_schedule_subscribe_override, #{
share_topic_filter => ShareTopicFilter,
new_type => {?schedule_subscribe, SubOpts},
old_action => format_schedule_action(ScheduledAction)
}),
SharedSubS0#{scheduled_actions := ScheduledActions1};
_ ->
?tp(debug, shared_subs_schedule_subscribe_new, #{
share_topic_filter => ShareTopicFilter, subopts => SubOpts
}),
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_subscribe(
Agent0, ShareTopicFilter, SubOpts
),
SharedSubS0#{agent => Agent1}
end.
%%--------------------------------------------------------------------
%% on_unsubscribe
-spec on_unsubscribe(
emqx_persistent_session_ds:id(),
share_topic_filter(),
emqx_persistent_session_ds_state:t(),
t()
) ->
{ok, emqx_persistent_session_ds_state:t(), t(), emqx_persistent_session_ds:subscription()}
| {error, emqx_types:reason_code()}.
on_unsubscribe(
SessionId, #share{topic = TopicFilter, group = Group} = ShareTopicFilter, S0, SharedSubS0
) ->
case lookup(ShareTopicFilter, S0) of
undefined ->
undefined
{error, ?RC_NO_SUBSCRIPTION_EXISTED};
#{id := SubId} = Subscription ->
?tp(persistent_session_ds_subscription_delete, #{
session_id => SessionId, share_topic_filter => ShareTopicFilter
}),
_ = emqx_external_broker:delete_persistent_shared_route(TopicFilter, Group, SessionId),
ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, #share_dest{
session_id = SessionId, group = Group
}),
S = emqx_persistent_session_ds_state:del_subscription(ShareTopicFilter, S0),
SharedSubS = schedule_unsubscribe(S, SharedSubS0, SubId, ShareTopicFilter),
{ok, S, SharedSubS, Subscription}
end.
%%--------------------------------------------------------------------
%% on_unsubscribe internal functions
schedule_unsubscribe(
S, #{scheduled_actions := ScheduledActions0} = SharedSubS0, UnsubscridedSubId, ShareTopicFilter
) ->
case ScheduledActions0 of
#{ShareTopicFilter := ScheduledAction0} ->
ScheduledAction1 = ScheduledAction0#{type => ?schedule_unsubscribe},
ScheduledActions1 = ScheduledActions0#{
ShareTopicFilter => ScheduledAction1
},
?tp(debug, shared_subs_schedule_unsubscribe_override, #{
share_topic_filter => ShareTopicFilter,
new_type => ?schedule_unsubscribe,
old_action => format_schedule_action(ScheduledAction0)
}),
SharedSubS0#{scheduled_actions := ScheduledActions1};
_ ->
StreamKeys = stream_keys_by_sub_id(S, UnsubscridedSubId),
ScheduledActions1 = ScheduledActions0#{
ShareTopicFilter => #{
type => ?schedule_unsubscribe,
stream_keys_to_wait => StreamKeys,
progresses => []
}
},
?tp(debug, shared_subs_schedule_unsubscribe_new, #{
share_topic_filter => ShareTopicFilter,
stream_keys => format_stream_keys(StreamKeys)
}),
SharedSubS0#{scheduled_actions := ScheduledActions1}
end.
%%--------------------------------------------------------------------
%% pre_renew_streams
-spec pre_renew_streams(emqx_persistent_session_ds_state:t(), t()) ->
{emqx_persistent_session_ds_state:t(), t()}.
pre_renew_streams(S, SharedSubS) ->
on_streams_replay(S, SharedSubS).
%%--------------------------------------------------------------------
%% renew_streams
-spec renew_streams(emqx_persistent_session_ds_state:t(), t()) ->
{emqx_persistent_session_ds_state:t(), t()}.
renew_streams(S0, #{agent := Agent0, scheduled_actions := ScheduledActions} = SharedSubS0) ->
{StreamLeaseEvents, Agent1} = emqx_persistent_session_ds_shared_subs_agent:renew_streams(
Agent0
),
StreamLeaseEvents =/= [] andalso
?tp(debug, shared_subs_new_stream_lease_events, #{
stream_lease_events => format_lease_events(StreamLeaseEvents)
}),
S1 = lists:foldl(
fun
(#{type := lease} = Event, S) -> accept_stream(Event, S, ScheduledActions);
(#{type := revoke} = Event, S) -> revoke_stream(Event, S)
end,
S0,
StreamLeaseEvents
),
SharedSubS1 = SharedSubS0#{agent => Agent1},
{S1, SharedSubS1}.
%%--------------------------------------------------------------------
%% renew_streams internal functions
accept_stream(#{share_topic_filter := ShareTopicFilter} = Event, S, ScheduledActions) ->
%% If we have a pending action (subscribe or unsubscribe) for this topic filter,
%% we should not accept a stream and start replaying it. We won't use it anyway:
%% * if subscribe is pending, we will reset agent obtain a new lease
%% * if unsubscribe is pending, we will drop connection
case ScheduledActions of
#{ShareTopicFilter := _Action} ->
S;
_ ->
accept_stream(Event, S)
end.
accept_stream(
#{topic_filter := TopicFilter, stream := Stream, iterator := Iterator}, S0
#{
share_topic_filter := ShareTopicFilter,
stream := Stream,
progress := #{iterator := Iterator} = _Progress
} = _Event,
S0
) ->
case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of
case emqx_persistent_session_ds_state:get_subscription(ShareTopicFilter, S0) of
undefined ->
%% This should not happen.
%% Agent should have received unsubscribe callback
%% and should not have passed this stream as a new one
error(new_stream_without_sub);
%% We unsubscribed
S0;
#{id := SubId, current_state := SStateId} ->
Key = {SubId, Stream},
case emqx_persistent_session_ds_state:get_stream(Key, S0) of
undefined ->
NeedCreateStream =
case emqx_persistent_session_ds_state:get_stream(Key, S0) of
undefined ->
true;
#srs{unsubscribed = true} ->
true;
_SRS ->
false
end,
case NeedCreateStream of
true ->
NewSRS =
#srs{
rank_x = ?rank_x,
@ -294,15 +403,15 @@ accept_stream(
},
S1 = emqx_persistent_session_ds_state:put_stream(Key, NewSRS, S0),
S1;
_SRS ->
false ->
S0
end
end.
revoke_stream(
#{topic_filter := TopicFilter, stream := Stream}, S0
#{share_topic_filter := ShareTopicFilter, stream := Stream}, S0
) ->
case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of
case emqx_persistent_session_ds_state:get_subscription(ShareTopicFilter, S0) of
undefined ->
%% This should not happen.
%% Agent should have received unsubscribe callback
@ -320,19 +429,363 @@ revoke_stream(
end
end.
-spec to_agent_subscription(
emqx_persistent_session_ds_state:t(), emqx_persistent_session_ds:subscription()
%%--------------------------------------------------------------------
%% on_streams_replay
-spec on_streams_replay(
emqx_persistent_session_ds_state:t(),
t()
) -> {emqx_persistent_session_ds_state:t(), t()}.
on_streams_replay(S0, SharedSubS0) ->
{S1, #{agent := Agent0, scheduled_actions := ScheduledActions0} = SharedSubS1} =
renew_streams(S0, SharedSubS0),
Progresses = all_stream_progresses(S1, Agent0),
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_stream_progress(
Agent0, Progresses
),
{Agent2, ScheduledActions1} = run_scheduled_actions(S1, Agent1, ScheduledActions0),
SharedSubS2 = SharedSubS1#{
agent => Agent2,
scheduled_actions => ScheduledActions1
},
{S1, SharedSubS2}.
%%--------------------------------------------------------------------
%% on_streams_replay internal functions
all_stream_progresses(S, Agent) ->
all_stream_progresses(S, Agent, _NeedUnacked = false).
all_stream_progresses(S, _Agent, NeedUnacked) ->
CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S),
CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S),
fold_shared_stream_states(
fun(ShareTopicFilter, Stream, SRS, ProgressesAcc0) ->
case
is_stream_started(CommQos1, CommQos2, SRS) and
(NeedUnacked or is_stream_fully_acked(CommQos1, CommQos2, SRS))
of
true ->
StreamProgress = stream_progress(CommQos1, CommQos2, Stream, SRS),
maps:update_with(
ShareTopicFilter,
fun(Progresses) -> [StreamProgress | Progresses] end,
[StreamProgress],
ProgressesAcc0
);
false ->
ProgressesAcc0
end
end,
#{},
S
).
run_scheduled_actions(S, Agent, ScheduledActions) ->
maps:fold(
fun(ShareTopicFilter, Action0, {AgentAcc0, ScheduledActionsAcc}) ->
case run_scheduled_action(S, AgentAcc0, ShareTopicFilter, Action0) of
{ok, AgentAcc1} ->
{AgentAcc1, maps:remove(ShareTopicFilter, ScheduledActionsAcc)};
{continue, Action1} ->
{AgentAcc0, ScheduledActionsAcc#{ShareTopicFilter => Action1}}
end
end,
{Agent, ScheduledActions},
ScheduledActions
).
run_scheduled_action(
S,
Agent0,
ShareTopicFilter,
#{type := Type, stream_keys_to_wait := StreamKeysToWait0, progresses := Progresses0} = Action
) ->
emqx_persistent_session_ds_shared_subs_agent:subscription().
to_agent_subscription(_S, Subscription) ->
StreamKeysToWait1 = filter_unfinished_streams(S, StreamKeysToWait0),
Progresses1 = stream_progresses(S, StreamKeysToWait0 -- StreamKeysToWait1) ++ Progresses0,
case StreamKeysToWait1 of
[] ->
?tp(debug, shared_subs_schedule_action_complete, #{
share_topic_filter => ShareTopicFilter,
progresses => format_stream_progresses(Progresses1),
type => Type
}),
%% Regular progress won't se unsubscribed streams, so we need to
%% send the progress explicitly.
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_stream_progress(
Agent0, #{ShareTopicFilter => Progresses1}
),
case Type of
{?schedule_subscribe, SubOpts} ->
{ok,
emqx_persistent_session_ds_shared_subs_agent:on_subscribe(
Agent1, ShareTopicFilter, SubOpts
)};
?schedule_unsubscribe ->
{ok,
emqx_persistent_session_ds_shared_subs_agent:on_unsubscribe(
Agent1, ShareTopicFilter, Progresses1
)}
end;
_ ->
Action1 = Action#{stream_keys_to_wait => StreamKeysToWait1, progresses => Progresses1},
?tp(debug, shared_subs_schedule_action_continue, #{
share_topic_filter => ShareTopicFilter,
new_action => format_schedule_action(Action1)
}),
{continue, Action1}
end.
filter_unfinished_streams(S, StreamKeysToWait) ->
CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S),
CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S),
lists:filter(
fun(Key) ->
case emqx_persistent_session_ds_state:get_stream(Key, S) of
undefined ->
%% This should not happen: we should see any stream
%% in completed state before deletion
true;
SRS ->
not is_stream_fully_acked(CommQos1, CommQos2, SRS)
end
end,
StreamKeysToWait
).
stream_progresses(S, StreamKeys) ->
CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S),
CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S),
lists:map(
fun({_SubId, Stream} = Key) ->
SRS = emqx_persistent_session_ds_state:get_stream(Key, S),
stream_progress(CommQos1, CommQos2, Stream, SRS)
end,
StreamKeys
).
%%--------------------------------------------------------------------
%% on_disconnect
on_disconnect(S0, #{agent := Agent0} = SharedSubS0) ->
S1 = revoke_all_streams(S0),
Progresses = all_stream_progresses(S1, Agent0, _NeedUnacked = true),
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_disconnect(Agent0, Progresses),
SharedSubS1 = SharedSubS0#{agent => Agent1, scheduled_actions => #{}},
{S1, SharedSubS1}.
%%--------------------------------------------------------------------
%% on_disconnect helpers
revoke_all_streams(S0) ->
fold_shared_stream_states(
fun(ShareTopicFilter, Stream, _SRS, S) ->
revoke_stream(#{share_topic_filter => ShareTopicFilter, stream => Stream}, S)
end,
S0,
S0
).
%%--------------------------------------------------------------------
%% on_info
-spec on_info(emqx_persistent_session_ds_state:t(), t(), term()) ->
{emqx_persistent_session_ds_state:t(), t()}.
on_info(S, #{agent := Agent0} = SharedSubS0, Info) ->
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_info(Agent0, Info),
SharedSubS1 = SharedSubS0#{agent => Agent1},
{S, SharedSubS1}.
%%--------------------------------------------------------------------
%% to_map
-spec to_map(emqx_persistent_session_ds_state:t(), t()) -> map().
to_map(S, _SharedSubS) ->
fold_shared_subs(
fun(ShareTopicFilter, _, Acc) -> Acc#{ShareTopicFilter => lookup(ShareTopicFilter, S)} end,
#{},
S
).
%%--------------------------------------------------------------------
%% cold_get_subscription
-spec cold_get_subscription(emqx_persistent_session_ds:id(), share_topic_filter()) ->
emqx_persistent_session_ds:subscription() | undefined.
cold_get_subscription(SessionId, ShareTopicFilter) ->
case emqx_persistent_session_ds_state:cold_get_subscription(SessionId, ShareTopicFilter) of
[Sub = #{current_state := SStateId}] ->
case
emqx_persistent_session_ds_state:cold_get_subscription_state(SessionId, SStateId)
of
[#{subopts := Subopts}] ->
Sub#{subopts => Subopts};
_ ->
undefined
end;
_ ->
undefined
end.
%%--------------------------------------------------------------------
%% Generic helpers
%%--------------------------------------------------------------------
lookup(ShareTopicFilter, S) ->
case emqx_persistent_session_ds_state:get_subscription(ShareTopicFilter, S) of
Sub = #{current_state := SStateId} ->
case emqx_persistent_session_ds_state:get_subscription_state(SStateId, S) of
#{subopts := SubOpts} ->
Sub#{subopts => SubOpts};
undefined ->
undefined
end;
undefined ->
undefined
end.
stream_keys_by_sub_id(S, MatchSubId) ->
emqx_persistent_session_ds_state:fold_streams(
fun({SubId, _Stream} = StreamKey, _SRS, StreamKeys) ->
case SubId of
MatchSubId ->
[StreamKey | StreamKeys];
_ ->
StreamKeys
end
end,
[],
S
).
stream_progress(
CommQos1,
CommQos2,
Stream,
#srs{
it_end = EndIt,
it_begin = BeginIt
} = SRS
) ->
Iterator =
case is_stream_fully_acked(CommQos1, CommQos2, SRS) of
true -> EndIt;
false -> BeginIt
end,
#{
stream => Stream,
progress => #{
iterator => Iterator
},
use_finished => is_use_finished(SRS)
}.
fold_shared_subs(Fun, Acc, S) ->
emqx_persistent_session_ds_state:fold_subscriptions(
fun
(#share{} = ShareTopicFilter, Sub, Acc0) -> Fun(ShareTopicFilter, Sub, Acc0);
(_, _Sub, Acc0) -> Acc0
end,
Acc,
S
).
fold_shared_stream_states(Fun, Acc, S) ->
%% TODO
%% do we need anything from sub state?
%% Optimize or cache
ShareTopicFilters = fold_shared_subs(
fun
(#share{} = ShareTopicFilter, #{id := Id} = _Sub, Acc0) ->
Acc0#{Id => ShareTopicFilter};
(_, _, Acc0) ->
Acc0
end,
#{},
S
),
emqx_persistent_session_ds_state:fold_streams(
fun({SubId, Stream}, SRS, Acc0) ->
case ShareTopicFilters of
#{SubId := ShareTopicFilter} ->
Fun(ShareTopicFilter, Stream, SRS, Acc0);
_ ->
Acc0
end
end,
Acc,
S
).
to_agent_subscription(_S, Subscription) ->
maps:with([start_time], Subscription).
-spec agent_opts(opts()) -> emqx_persistent_session_ds_shared_subs_agent:opts().
agent_opts(#{session_id := SessionId}) ->
#{session_id => SessionId}.
-dialyzer({nowarn_function, now_ms/0}).
now_ms() ->
erlang:system_time(millisecond).
is_use_finished(#srs{unsubscribed = Unsubscribed}) ->
Unsubscribed.
is_stream_started(CommQos1, CommQos2, #srs{first_seqno_qos1 = Q1, last_seqno_qos1 = Q2}) ->
(CommQos1 >= Q1) or (CommQos2 >= Q2).
is_stream_fully_acked(_, _, #srs{
first_seqno_qos1 = Q1, last_seqno_qos1 = Q1, first_seqno_qos2 = Q2, last_seqno_qos2 = Q2
}) ->
%% Streams where the last chunk doesn't contain any QoS1 and 2
%% messages are considered fully acked:
true;
is_stream_fully_acked(Comm1, Comm2, #srs{last_seqno_qos1 = S1, last_seqno_qos2 = S2}) ->
(Comm1 >= S1) andalso (Comm2 >= S2).
%%--------------------------------------------------------------------
%% Formatters
%%--------------------------------------------------------------------
format_schedule_action(#{
type := Type, progresses := Progresses, stream_keys_to_wait := StreamKeysToWait
}) ->
#{
type => Type,
progresses => format_stream_progresses(Progresses),
stream_keys_to_wait => format_stream_keys(StreamKeysToWait)
}.
format_stream_progresses(Streams) ->
lists:map(
fun format_stream_progress/1,
Streams
).
format_stream_progress(#{stream := Stream, progress := Progress} = Value) ->
Value#{stream => format_opaque(Stream), progress => format_progress(Progress)}.
format_progress(#{iterator := Iterator} = Progress) ->
Progress#{iterator => format_opaque(Iterator)}.
format_stream_key(beginning) -> beginning;
format_stream_key({SubId, Stream}) -> {SubId, format_opaque(Stream)}.
format_stream_keys(StreamKeys) ->
lists:map(
fun format_stream_key/1,
StreamKeys
).
format_lease_events(Events) ->
lists:map(
fun format_lease_event/1,
Events
).
format_lease_event(#{stream := Stream, progress := Progress} = Event) ->
Event#{stream => format_opaque(Stream), progress => format_progress(Progress)};
format_lease_event(#{stream := Stream} = Event) ->
Event#{stream => format_opaque(Stream)}.
format_opaque(Opaque) ->
erlang:phash2(Opaque).

View File

@ -15,7 +15,7 @@
}.
-type t() :: term().
-type topic_filter() :: emqx_persistent_session_ds:share_topic_filter().
-type share_topic_filter() :: emqx_persistent_session_ds:share_topic_filter().
-type opts() :: #{
session_id := session_id()
@ -28,41 +28,44 @@
-type stream_lease() :: #{
type => lease,
%% Used as "external" subscription_id
topic_filter := topic_filter(),
share_topic_filter := share_topic_filter(),
stream := emqx_ds:stream(),
iterator := emqx_ds:iterator()
}.
-type stream_revoke() :: #{
type => revoke,
topic_filter := topic_filter(),
share_topic_filter := share_topic_filter(),
stream := emqx_ds:stream()
}.
-type stream_lease_event() :: stream_lease() | stream_revoke().
-type stream_progress() :: #{
topic_filter := topic_filter(),
share_topic_filter := share_topic_filter(),
stream := emqx_ds:stream(),
iterator := emqx_ds:iterator()
iterator := emqx_ds:iterator(),
use_finished := boolean()
}.
-export_type([
t/0,
subscription/0,
session_id/0,
stream_lease/0,
stream_lease_event/0,
opts/0
]).
-export([
new/1,
open/2,
can_subscribe/3,
on_subscribe/3,
on_unsubscribe/2,
on_unsubscribe/3,
on_stream_progress/2,
on_info/2,
on_disconnect/2,
renew_streams/1
]).
@ -77,12 +80,13 @@
%%--------------------------------------------------------------------
-callback new(opts()) -> t().
-callback open([{topic_filter(), subscription()}], opts()) -> t().
-callback on_subscribe(t(), topic_filter(), emqx_types:subopts()) ->
{ok, t()} | {error, term()}.
-callback on_unsubscribe(t(), topic_filter()) -> t().
-callback open([{share_topic_filter(), subscription()}], opts()) -> t().
-callback can_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> ok | {error, term()}.
-callback on_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> t().
-callback on_unsubscribe(t(), share_topic_filter(), [stream_progress()]) -> t().
-callback on_disconnect(t(), [stream_progress()]) -> t().
-callback renew_streams(t()) -> {[stream_lease_event()], t()}.
-callback on_stream_progress(t(), [stream_progress()]) -> t().
-callback on_stream_progress(t(), #{share_topic_filter() => [stream_progress()]}) -> t().
-callback on_info(t(), term()) -> t().
%%--------------------------------------------------------------------
@ -93,24 +97,31 @@
new(Opts) ->
?shared_subs_agent:new(Opts).
-spec open([{topic_filter(), subscription()}], opts()) -> t().
-spec open([{share_topic_filter(), subscription()}], opts()) -> t().
open(Topics, Opts) ->
?shared_subs_agent:open(Topics, Opts).
-spec on_subscribe(t(), topic_filter(), emqx_types:subopts()) ->
{ok, t()} | {error, emqx_types:reason_code()}.
on_subscribe(Agent, TopicFilter, SubOpts) ->
?shared_subs_agent:on_subscribe(Agent, TopicFilter, SubOpts).
-spec can_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> ok | {error, term()}.
can_subscribe(Agent, ShareTopicFilter, SubOpts) ->
?shared_subs_agent:can_subscribe(Agent, ShareTopicFilter, SubOpts).
-spec on_unsubscribe(t(), topic_filter()) -> t().
on_unsubscribe(Agent, TopicFilter) ->
?shared_subs_agent:on_unsubscribe(Agent, TopicFilter).
-spec on_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> t().
on_subscribe(Agent, ShareTopicFilter, SubOpts) ->
?shared_subs_agent:on_subscribe(Agent, ShareTopicFilter, SubOpts).
-spec on_unsubscribe(t(), share_topic_filter(), [stream_progress()]) -> t().
on_unsubscribe(Agent, ShareTopicFilter, StreamProgresses) ->
?shared_subs_agent:on_unsubscribe(Agent, ShareTopicFilter, StreamProgresses).
-spec on_disconnect(t(), #{share_topic_filter() => [stream_progress()]}) -> t().
on_disconnect(Agent, StreamProgresses) ->
?shared_subs_agent:on_disconnect(Agent, StreamProgresses).
-spec renew_streams(t()) -> {[stream_lease_event()], t()}.
renew_streams(Agent) ->
?shared_subs_agent:renew_streams(Agent).
-spec on_stream_progress(t(), [stream_progress()]) -> t().
-spec on_stream_progress(t(), #{share_topic_filter() => [stream_progress()]}) -> t().
on_stream_progress(Agent, StreamProgress) ->
?shared_subs_agent:on_stream_progress(Agent, StreamProgress).

View File

@ -9,11 +9,13 @@
-export([
new/1,
open/2,
can_subscribe/3,
on_subscribe/3,
on_unsubscribe/2,
on_unsubscribe/3,
on_stream_progress/2,
on_info/2,
on_disconnect/2,
renew_streams/1
]).
@ -30,10 +32,16 @@ new(_Opts) ->
open(_Topics, _Opts) ->
undefined.
on_subscribe(_Agent, _TopicFilter, _SubOpts) ->
can_subscribe(_Agent, _TopicFilter, _SubOpts) ->
{error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}.
on_unsubscribe(Agent, _TopicFilter) ->
on_subscribe(Agent, _TopicFilter, _SubOpts) ->
Agent.
on_unsubscribe(Agent, _TopicFilter, _Progresses) ->
Agent.
on_disconnect(Agent, _) ->
Agent.
renew_streams(Agent) ->

View File

@ -399,7 +399,9 @@ new_id(Rec) ->
get_subscription(TopicFilter, Rec) ->
gen_get(?subscriptions, TopicFilter, Rec).
-spec cold_get_subscription(emqx_persistent_session_ds:id(), emqx_types:topic()) ->
-spec cold_get_subscription(
emqx_persistent_session_ds:id(), emqx_types:topic() | emqx_types:share()
) ->
[emqx_persistent_session_ds_subs:subscription()].
cold_get_subscription(SessionId, Topic) ->
kv_pmap_read(?subscription_tab, SessionId, Topic).

View File

@ -21,7 +21,7 @@
-record(ps_route, {
topic :: binary(),
dest :: emqx_persistent_session_ds:id() | '_'
dest :: emqx_persistent_session_ds_router:dest() | '_'
}).
-record(ps_routeidx, {

View File

@ -21,6 +21,7 @@
%% Till full implementation we need to dispach to the null agent.
%% It will report "not implemented" error for attempts to use shared subscriptions.
-define(shared_subs_agent, emqx_persistent_session_ds_shared_subs_null_agent).
% -define(shared_subs_agent, emqx_ds_shared_sub_agent).
%% end of -ifdef(TEST).
-endif.

View File

@ -62,7 +62,7 @@
streams := [{pid(), quicer:stream_handle()}],
%% New stream opts
stream_opts := map(),
%% If conneciton is resumed from session ticket
%% If connection is resumed from session ticket
is_resumed => boolean(),
%% mqtt message serializer config
serialize => undefined,
@ -70,8 +70,8 @@
}.
-type cb_ret() :: quicer_lib:cb_ret().
%% @doc Data streams initializions are started in parallel with control streams, data streams are blocked
%% for the activation from control stream after it is accepted as a legit conneciton.
%% @doc Data streams initializations are started in parallel with control streams, data streams are blocked
%% for the activation from control stream after it is accepted as a legit connection.
%% For security, the initial number of allowed data streams from client should be limited by
%% 'peer_bidi_stream_count` & 'peer_unidi_stream_count`
-spec activate_data_streams(pid(), {
@ -80,7 +80,7 @@
activate_data_streams(ConnOwner, {PS, Serialize, Channel}) ->
gen_server:call(ConnOwner, {activate_data_streams, {PS, Serialize, Channel}}, infinity).
%% @doc conneciton owner init callback
%% @doc connection owner init callback
-spec init(map()) -> {ok, cb_state()}.
init(#{stream_opts := SOpts} = S) when is_list(SOpts) ->
init(S#{stream_opts := maps:from_list(SOpts)});

View File

@ -351,6 +351,7 @@ fields("authz_cache") ->
#{
default => true,
required => true,
importance => ?IMPORTANCE_NO_DOC,
desc => ?DESC(fields_cache_enable)
}
)},
@ -387,6 +388,7 @@ fields("flapping_detect") ->
boolean(),
#{
default => false,
%% importance => ?IMPORTANCE_NO_DOC,
desc => ?DESC(flapping_detect_enable)
}
)},
@ -423,6 +425,7 @@ fields("force_shutdown") ->
boolean(),
#{
default => true,
importance => ?IMPORTANCE_NO_DOC,
desc => ?DESC(force_shutdown_enable)
}
)},
@ -452,6 +455,7 @@ fields("overload_protection") ->
boolean(),
#{
desc => ?DESC(overload_protection_enable),
%% importance => ?IMPORTANCE_NO_DOC,
default => false
}
)},
@ -512,7 +516,11 @@ fields("force_gc") ->
{"enable",
sc(
boolean(),
#{default => true, desc => ?DESC(force_gc_enable)}
#{
default => true,
importance => ?IMPORTANCE_NO_DOC,
desc => ?DESC(force_gc_enable)
}
)},
{"count",
sc(
@ -1665,6 +1673,7 @@ fields("durable_sessions") ->
sc(
boolean(), #{
desc => ?DESC(durable_sessions_enable),
%% importance => ?IMPORTANCE_NO_DOC,
default => false
}
)},
@ -1888,6 +1897,7 @@ base_listener(Bind) ->
#{
default => true,
aliases => [enabled],
importance => ?IMPORTANCE_NO_DOC,
desc => ?DESC(fields_listener_enabled)
}
)},
@ -2416,6 +2426,7 @@ client_ssl_opts_schema(Defaults) ->
boolean(),
#{
default => false,
%% importance => ?IMPORTANCE_NO_DOC,
desc => ?DESC(client_ssl_opts_schema_enable)
}
)},

View File

@ -589,6 +589,14 @@ ensure_valid_options(Options, Versions) ->
ensure_valid_options([], _, Acc) ->
lists:reverse(Acc);
ensure_valid_options([{K, undefined} | T], Versions, Acc) when
K =:= crl_check;
K =:= crl_cache
->
%% Note: we must set crl options to `undefined' to unset them. Otherwise,
%% `esockd' will retain such options when `esockd:merge_opts/2' is called and the SSL
%% options were previously enabled.
ensure_valid_options(T, Versions, [{K, undefined} | Acc]);
ensure_valid_options([{_, undefined} | T], Versions, Acc) ->
ensure_valid_options(T, Versions, Acc);
ensure_valid_options([{_, ""} | T], Versions, Acc) ->

View File

@ -17,7 +17,6 @@
-include("emqx_mqtt.hrl").
-export([format/2]).
-export([format_meta_map/1]).
%% logger_formatter:config/0 is not exported.
-type config() :: map().
@ -43,10 +42,6 @@ format(
format(Event, Config) ->
emqx_logger_textfmt:format(Event, Config).
format_meta_map(Meta) ->
Encode = emqx_trace_handler:payload_encode(),
format_meta_map(Meta, Encode).
format_meta_map(Meta, Encode) ->
format_meta_map(Meta, Encode, [
{packet, fun format_packet/2},

View File

@ -436,6 +436,7 @@ websocket_handle({Frame, _}, State) ->
%% TODO: should not close the ws connection
?LOG(error, #{msg => "unexpected_frame", frame => Frame}),
shutdown(unexpected_ws_frame, State).
websocket_info({call, From, Req}, State) ->
handle_call(From, Req, State);
websocket_info({cast, rate_limit}, State) ->
@ -737,7 +738,8 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
input_bytes => Data
}),
FrameError = {frame_error, Reason},
{[{incoming, FrameError} | Packets], State};
NState = enrich_state(Reason, State),
{[{incoming, FrameError} | Packets], NState};
error:Reason:Stacktrace ->
?LOG(error, #{
at_state => emqx_frame:describe_state(ParseState),
@ -830,7 +832,7 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
?LOG(warning, #{
msg => "packet_discarded",
reason => "frame_too_large",
packet => emqx_packet:format(Packet)
packet => Packet
}),
ok = emqx_metrics:inc('delivery.dropped.too_large'),
ok = emqx_metrics:inc('delivery.dropped'),
@ -1069,6 +1071,13 @@ check_max_connection(Type, Listener) ->
{denny, Reason}
end
end.
enrich_state(#{parse_state := NParseState}, State) ->
Serialize = emqx_frame:serialize_opts(NParseState),
State#state{parse_state = NParseState, serialize = Serialize};
enrich_state(_, State) ->
State.
%%--------------------------------------------------------------------
%% For CT tests
%%--------------------------------------------------------------------

View File

@ -414,24 +414,32 @@ t_handle_in_auth(_) ->
emqx_channel:handle_in(?AUTH_PACKET(), Channel).
t_handle_in_frame_error(_) ->
IdleChannel = channel(#{conn_state => idle}),
{shutdown, #{shutdown_count := frame_too_large, cause := frame_too_large}, _Chan} =
emqx_channel:handle_in({frame_error, #{cause => frame_too_large}}, IdleChannel),
IdleChannelV5 = channel(#{conn_state => idle}),
%% no CONNACK packet for v4
?assertMatch(
{shutdown, #{shutdown_count := frame_too_large, cause := frame_too_large}, _Chan},
emqx_channel:handle_in(
{frame_error, #{cause => frame_too_large}}, v4(IdleChannelV5)
)
),
ConnectingChan = channel(#{conn_state => connecting}),
ConnackPacket = ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE),
{shutdown,
#{
shutdown_count := frame_too_large,
cause := frame_too_large,
limit := 100,
received := 101
},
ConnackPacket,
_} =
?assertMatch(
{shutdown,
#{
shutdown_count := frame_too_large,
cause := frame_too_large,
limit := 100,
received := 101
},
ConnackPacket, _},
emqx_channel:handle_in(
{frame_error, #{cause => frame_too_large, received => 101, limit => 100}},
ConnectingChan
),
)
),
DisconnectPacket = ?DISCONNECT_PACKET(?RC_PACKET_TOO_LARGE),
ConnectedChan = channel(#{conn_state => connected}),
?assertMatch(

View File

@ -78,6 +78,7 @@
start_epmd/0,
start_peer/2,
stop_peer/1,
ebin_path/0,
listener_port/2
]).

View File

@ -138,13 +138,14 @@ init_per_testcase(t_refresh_config = TestCase, Config) ->
];
init_per_testcase(TestCase, Config) when
TestCase =:= t_update_listener;
TestCase =:= t_update_listener_enable_disable;
TestCase =:= t_validations
->
ct:timetrap({seconds, 30}),
ok = snabbkaffe:start_trace(),
%% when running emqx standalone tests, we can't use those
%% features.
case does_module_exist(emqx_management) of
case does_module_exist(emqx_mgmt) of
true ->
DataDir = ?config(data_dir, Config),
CRLFile = filename:join([DataDir, "intermediate-revoked.crl.pem"]),
@ -165,7 +166,7 @@ init_per_testcase(TestCase, Config) when
{emqx_conf, #{config => #{listeners => #{ssl => #{default => ListenerConf}}}}},
emqx,
emqx_management,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
emqx_mgmt_api_test_util:emqx_dashboard()
],
#{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}
),
@ -206,6 +207,7 @@ read_crl(Filename) ->
end_per_testcase(TestCase, Config) when
TestCase =:= t_update_listener;
TestCase =:= t_update_listener_enable_disable;
TestCase =:= t_validations
->
Skip = proplists:get_bool(skip_does_not_apply, Config),
@ -1057,3 +1059,104 @@ do_t_validations(_Config) ->
),
ok.
%% Checks that if CRL is ever enabled and then disabled, clients can connect, even if they
%% would otherwise not have their corresponding CRLs cached and fail with `{bad_crls,
%% no_relevant_crls}`.
t_update_listener_enable_disable(Config) ->
case proplists:get_bool(skip_does_not_apply, Config) of
true ->
ct:pal("skipping as this test does not apply in this profile"),
ok;
false ->
do_t_update_listener_enable_disable(Config)
end.
do_t_update_listener_enable_disable(Config) ->
DataDir = ?config(data_dir, Config),
Keyfile = filename:join([DataDir, "server.key.pem"]),
Certfile = filename:join([DataDir, "server.cert.pem"]),
Cacertfile = filename:join([DataDir, "ca-chain.cert.pem"]),
ClientCert = filename:join(DataDir, "client.cert.pem"),
ClientKey = filename:join(DataDir, "client.key.pem"),
ListenerId = "ssl:default",
%% Enable CRL
{ok, {{_, 200, _}, _, ListenerData0}} = get_listener_via_api(ListenerId),
CRLConfig0 =
#{
<<"ssl_options">> =>
#{
<<"keyfile">> => Keyfile,
<<"certfile">> => Certfile,
<<"cacertfile">> => Cacertfile,
<<"enable_crl_check">> => true,
<<"fail_if_no_peer_cert">> => true
}
},
ListenerData1 = emqx_utils_maps:deep_merge(ListenerData0, CRLConfig0),
{ok, {_, _, ListenerData2}} = update_listener_via_api(ListenerId, ListenerData1),
?assertMatch(
#{
<<"ssl_options">> :=
#{
<<"enable_crl_check">> := true,
<<"verify">> := <<"verify_peer">>,
<<"fail_if_no_peer_cert">> := true
}
},
ListenerData2
),
%% Disable CRL
CRLConfig1 =
#{
<<"ssl_options">> =>
#{
<<"keyfile">> => Keyfile,
<<"certfile">> => Certfile,
<<"cacertfile">> => Cacertfile,
<<"enable_crl_check">> => false,
<<"fail_if_no_peer_cert">> => true
}
},
ListenerData3 = emqx_utils_maps:deep_merge(ListenerData2, CRLConfig1),
redbug:start(
[
"esockd_server:get_listener_prop -> return",
"esockd_server:set_listener_prop -> return",
"esockd:merge_opts -> return",
"esockd_listener_sup:set_options -> return",
"emqx_listeners:inject_crl_config -> return"
],
[{msgs, 100}]
),
{ok, {_, _, ListenerData4}} = update_listener_via_api(ListenerId, ListenerData3),
?assertMatch(
#{
<<"ssl_options">> :=
#{
<<"enable_crl_check">> := false,
<<"verify">> := <<"verify_peer">>,
<<"fail_if_no_peer_cert">> := true
}
},
ListenerData4
),
%% Now the client that would be blocked tries to connect and should now be allowed.
{ok, C} = emqtt:start_link([
{ssl, true},
{ssl_opts, [
{certfile, ClientCert},
{keyfile, ClientKey},
{verify, verify_none}
]},
{port, 8883}
]),
?assertMatch({ok, _}, emqtt:connect(C)),
emqtt:stop(C),
?assertNotReceive({http_get, _}),
ok.

View File

@ -79,6 +79,8 @@
%% "Unofficial" `emqx_config_handler' and `emqx_conf' APIs
-export([schema_module/0, upgrade_raw_conf/1]).
-export([skip_if_oss/0]).
-export_type([appspec/0]).
-export_type([appspec_opts/0]).
@ -389,6 +391,8 @@ default_appspec(emqx_schema_validation, _SuiteOpts) ->
#{schema_mod => emqx_schema_validation_schema, config => #{}};
default_appspec(emqx_message_transformation, _SuiteOpts) ->
#{schema_mod => emqx_message_transformation_schema, config => #{}};
default_appspec(emqx_ds_shared_sub, _SuiteOpts) ->
#{schema_mod => emqx_ds_shared_sub_schema, config => #{}};
default_appspec(_, _) ->
#{}.
@ -519,3 +523,14 @@ upgrade_raw_conf(Conf) ->
ce ->
emqx_conf_schema:upgrade_raw_conf(Conf)
end.
skip_if_oss() ->
try emqx_release:edition() of
ee ->
false;
_ ->
{skip, not_supported_in_oss}
catch
error:undef ->
{skip, standalone_not_supported}
end.

View File

@ -56,6 +56,8 @@ t_exclusive_sub(_) ->
{ok, _} = emqtt:connect(C1),
?CHECK_SUB(C1, 0),
?CHECK_SUB(C1, 0),
{ok, C2} = emqtt:start_link([
{clientid, <<"client2">>},
{clean_start, false},

View File

@ -63,6 +63,7 @@ groups() ->
t_parse_malformed_properties,
t_malformed_connect_header,
t_malformed_connect_data,
t_malformed_connect_data_proto_ver,
t_reserved_connect_flag,
t_invalid_clientid,
t_undefined_password,
@ -167,6 +168,8 @@ t_parse_malformed_utf8_string(_) ->
ParseState = emqx_frame:initial_parse_state(#{strict_mode => true}),
?ASSERT_FRAME_THROW(utf8_string_invalid, emqx_frame:parse(MalformedPacket, ParseState)).
%% TODO: parse v3 with 0 length clientid
t_serialize_parse_v3_connect(_) ->
Bin =
<<16, 37, 0, 6, 77, 81, 73, 115, 100, 112, 3, 2, 0, 60, 0, 23, 109, 111, 115, 113, 112, 117,
@ -324,7 +327,7 @@ t_serialize_parse_bridge_connect(_) ->
header = #mqtt_packet_header{type = ?CONNECT},
variable = #mqtt_packet_connect{
clientid = <<"C_00:0C:29:2B:77:52">>,
proto_ver = 16#03,
proto_ver = ?MQTT_PROTO_V3,
proto_name = <<"MQIsdp">>,
is_bridge = true,
will_retain = true,
@ -686,15 +689,36 @@ t_malformed_connect_header(_) ->
).
t_malformed_connect_data(_) ->
ProtoNameWithLen = <<0, 6, "MQIsdp">>,
ConnectFlags = <<2#00000000>>,
ClientIdwithLen = <<0, 1, "a">>,
UnexpectedRestBin = <<0, 1, 2>>,
?ASSERT_FRAME_THROW(
#{cause := malformed_connect, unexpected_trailing_bytes := _},
emqx_frame:parse(<<16, 15, 0, 6, 77, 81, 73, 115, 100, 112, 3, 0, 0, 0, 0, 0, 0>>)
#{cause := malformed_connect, unexpected_trailing_bytes := 3},
emqx_frame:parse(
<<16, 18, ProtoNameWithLen/binary, ?MQTT_PROTO_V3, ConnectFlags/binary, 0, 0,
ClientIdwithLen/binary, UnexpectedRestBin/binary>>
)
).
t_malformed_connect_data_proto_ver(_) ->
Proto3NameWithLen = <<0, 6, "MQIsdp">>,
?ASSERT_FRAME_THROW(
#{cause := malformed_connect, header_bytes := <<>>},
emqx_frame:parse(<<16, 8, Proto3NameWithLen/binary>>)
),
ProtoNameWithLen = <<0, 4, "MQTT">>,
?ASSERT_FRAME_THROW(
#{cause := malformed_connect, header_bytes := <<>>},
emqx_frame:parse(<<16, 6, ProtoNameWithLen/binary>>)
).
t_reserved_connect_flag(_) ->
?assertException(
throw,
{frame_parse_error, reserved_connect_flag},
{frame_parse_error, #{
cause := reserved_connect_flag, proto_ver := ?MQTT_PROTO_V3, proto_name := <<"MQIsdp">>
}},
emqx_frame:parse(<<16, 15, 0, 6, 77, 81, 73, 115, 100, 112, 3, 1, 0, 0, 1, 0, 0>>)
).
@ -726,7 +750,7 @@ t_undefined_password(_) ->
},
variable = #mqtt_packet_connect{
proto_name = <<"MQTT">>,
proto_ver = 4,
proto_ver = ?MQTT_PROTO_V4,
is_bridge = false,
clean_start = true,
will_flag = false,
@ -774,7 +798,9 @@ t_invalid_will_retain(_) ->
54, 75, 78, 112, 57, 0, 6, 68, 103, 55, 87, 87, 87>>,
?assertException(
throw,
{frame_parse_error, invalid_will_retain},
{frame_parse_error, #{
cause := invalid_will_retain, proto_ver := ?MQTT_PROTO_V5, proto_name := <<"MQTT">>
}},
emqx_frame:parse(ConnectBin)
),
ok.
@ -796,22 +822,30 @@ t_invalid_will_qos(_) ->
),
?assertException(
throw,
{frame_parse_error, invalid_will_qos},
{frame_parse_error, #{
cause := invalid_will_qos, proto_ver := ?MQTT_PROTO_V5, proto_name := <<"MQTT">>
}},
emqx_frame:parse(ConnectBinFun(Will_F_WillQoS1))
),
?assertException(
throw,
{frame_parse_error, invalid_will_qos},
{frame_parse_error, #{
cause := invalid_will_qos, proto_ver := ?MQTT_PROTO_V5, proto_name := <<"MQTT">>
}},
emqx_frame:parse(ConnectBinFun(Will_F_WillQoS2))
),
?assertException(
throw,
{frame_parse_error, invalid_will_qos},
{frame_parse_error, #{
cause := invalid_will_qos, proto_ver := ?MQTT_PROTO_V5, proto_name := <<"MQTT">>
}},
emqx_frame:parse(ConnectBinFun(Will_F_WillQoS3))
),
?assertException(
throw,
{frame_parse_error, invalid_will_qos},
{frame_parse_error, #{
cause := invalid_will_qos, proto_ver := ?MQTT_PROTO_V5, proto_name := <<"MQTT">>
}},
emqx_frame:parse(ConnectBinFun(Will_T_WillQoS3))
),
ok.

View File

@ -26,6 +26,7 @@
%% Have to use real msgs, as the schema is guarded by enum.
-define(THROTTLE_MSG, authorization_permission_denied).
-define(THROTTLE_MSG1, cannot_publish_to_topic_due_to_not_authorized).
-define(THROTTLE_UNRECOVERABLE_MSG, unrecoverable_resource_error).
-define(TIME_WINDOW, <<"1s">>).
all() -> emqx_common_test_helpers:all(?MODULE).
@ -59,6 +60,11 @@ end_per_suite(Config) ->
emqx_cth_suite:stop(?config(suite_apps, Config)),
emqx_config:delete_override_conf_files().
init_per_testcase(t_throttle_recoverable_msg, Config) ->
ok = snabbkaffe:start_trace(),
[?THROTTLE_MSG] = Conf = emqx:get_config([log, throttling, msgs]),
{ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_UNRECOVERABLE_MSG | Conf], #{}),
Config;
init_per_testcase(t_throttle_add_new_msg, Config) ->
ok = snabbkaffe:start_trace(),
[?THROTTLE_MSG] = Conf = emqx:get_config([log, throttling, msgs]),
@ -72,6 +78,10 @@ init_per_testcase(_TC, Config) ->
ok = snabbkaffe:start_trace(),
Config.
end_per_testcase(t_throttle_recoverable_msg, _Config) ->
ok = snabbkaffe:stop(),
{ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_MSG], #{}),
ok;
end_per_testcase(t_throttle_add_new_msg, _Config) ->
ok = snabbkaffe:stop(),
{ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_MSG], #{}),
@ -101,8 +111,8 @@ t_throttle(_Config) ->
5000
),
?assert(emqx_log_throttler:allow(?THROTTLE_MSG)),
?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG)),
?assert(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)),
?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)),
{ok, _} = ?block_until(
#{
?snk_kind := log_throttler_dropped,
@ -115,14 +125,48 @@ t_throttle(_Config) ->
[]
).
t_throttle_recoverable_msg(_Config) ->
ResourceId = <<"resource_id">>,
ThrottledMsg = iolist_to_binary([atom_to_list(?THROTTLE_UNRECOVERABLE_MSG), ":", ResourceId]),
?check_trace(
begin
%% Warm-up and block to increase the probability that next events
%% will be in the same throttling time window.
{ok, _} = ?block_until(
#{?snk_kind := log_throttler_new_msg, throttled_msg := ?THROTTLE_UNRECOVERABLE_MSG},
5000
),
{_, {ok, _}} = ?wait_async_action(
events(?THROTTLE_UNRECOVERABLE_MSG, ResourceId),
#{
?snk_kind := log_throttler_dropped,
throttled_msg := ThrottledMsg
},
5000
),
?assert(emqx_log_throttler:allow(?THROTTLE_UNRECOVERABLE_MSG, ResourceId)),
?assertNot(emqx_log_throttler:allow(?THROTTLE_UNRECOVERABLE_MSG, ResourceId)),
{ok, _} = ?block_until(
#{
?snk_kind := log_throttler_dropped,
throttled_msg := ThrottledMsg,
dropped_count := 1
},
3000
)
end,
[]
).
t_throttle_add_new_msg(_Config) ->
?check_trace(
begin
{ok, _} = ?block_until(
#{?snk_kind := log_throttler_new_msg, throttled_msg := ?THROTTLE_MSG1}, 5000
),
?assert(emqx_log_throttler:allow(?THROTTLE_MSG1)),
?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG1)),
?assert(emqx_log_throttler:allow(?THROTTLE_MSG1, undefined)),
?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG1, undefined)),
{ok, _} = ?block_until(
#{
?snk_kind := log_throttler_dropped,
@ -137,10 +181,15 @@ t_throttle_add_new_msg(_Config) ->
t_throttle_no_msg(_Config) ->
%% Must simply pass with no crashes
?assert(emqx_log_throttler:allow(no_test_throttle_msg)),
?assert(emqx_log_throttler:allow(no_test_throttle_msg)),
timer:sleep(10),
?assert(erlang:is_process_alive(erlang:whereis(emqx_log_throttler))).
Pid = erlang:whereis(emqx_log_throttler),
?assert(emqx_log_throttler:allow(no_test_throttle_msg, undefined)),
?assert(emqx_log_throttler:allow(no_test_throttle_msg, undefined)),
%% assert process is not restarted
?assertEqual(Pid, erlang:whereis(emqx_log_throttler)),
%% make a gen_call to ensure the process is alive
%% note: this call result in an 'unexpected_call' error log.
?assertEqual(ignored, gen_server:call(Pid, probe)),
ok.
t_update_time_window(_Config) ->
?check_trace(
@ -168,8 +217,8 @@ t_throttle_debug_primary_level(_Config) ->
#{?snk_kind := log_throttler_dropped, throttled_msg := ?THROTTLE_MSG},
5000
),
?assert(emqx_log_throttler:allow(?THROTTLE_MSG)),
?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG)),
?assert(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)),
?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)),
{ok, _} = ?block_until(
#{
?snk_kind := log_throttler_dropped,
@ -187,10 +236,13 @@ t_throttle_debug_primary_level(_Config) ->
%%--------------------------------------------------------------------
events(Msg) ->
events(100, Msg).
events(100, Msg, undefined).
events(N, Msg) ->
[emqx_log_throttler:allow(Msg) || _ <- lists:seq(1, N)].
events(Msg, Id) ->
events(100, Msg, Id).
events(N, Msg, Id) ->
[emqx_log_throttler:allow(Msg, Id) || _ <- lists:seq(1, N)].
module_exists(Mod) ->
case erlang:module_loaded(Mod) of

View File

@ -377,42 +377,60 @@ t_will_msg(_) ->
t_format(_) ->
io:format("~ts", [
emqx_packet:format(#mqtt_packet{
header = #mqtt_packet_header{type = ?CONNACK, retain = true, dup = 0},
variable = undefined
})
]),
io:format("~ts", [
emqx_packet:format(#mqtt_packet{
header = #mqtt_packet_header{type = ?CONNACK}, variable = 1, payload = <<"payload">>
})
emqx_packet:format(
#mqtt_packet{
header = #mqtt_packet_header{type = ?CONNACK, retain = true, dup = 0},
variable = undefined
},
text
)
]),
io:format(
"~ts",
[
emqx_packet:format(
#mqtt_packet{
header = #mqtt_packet_header{type = ?CONNACK},
variable = 1,
payload = <<"payload">>
},
text
)
]
),
io:format("~ts", [
emqx_packet:format(
?CONNECT_PACKET(#mqtt_packet_connect{
will_flag = true,
will_retain = true,
will_qos = ?QOS_2,
will_topic = <<"topic">>,
will_payload = <<"payload">>
})
?CONNECT_PACKET(
#mqtt_packet_connect{
will_flag = true,
will_retain = true,
will_qos = ?QOS_2,
will_topic = <<"topic">>,
will_payload = <<"payload">>
}
),
text
)
]),
io:format("~ts", [
emqx_packet:format(?CONNECT_PACKET(#mqtt_packet_connect{password = password}))
emqx_packet:format(?CONNECT_PACKET(#mqtt_packet_connect{password = password}), text)
]),
io:format("~ts", [emqx_packet:format(?CONNACK_PACKET(?CONNACK_SERVER))]),
io:format("~ts", [emqx_packet:format(?PUBLISH_PACKET(?QOS_1, 1))]),
io:format("~ts", [emqx_packet:format(?PUBLISH_PACKET(?QOS_2, <<"topic">>, 10, <<"payload">>))]),
io:format("~ts", [emqx_packet:format(?PUBACK_PACKET(?PUBACK, 98))]),
io:format("~ts", [emqx_packet:format(?PUBREL_PACKET(99))]),
io:format("~ts", [emqx_packet:format(?CONNACK_PACKET(?CONNACK_SERVER), text)]),
io:format("~ts", [emqx_packet:format(?PUBLISH_PACKET(?QOS_1, 1), text)]),
io:format("~ts", [
emqx_packet:format(?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS_0}, {<<"topic1">>, ?QOS_1}]))
emqx_packet:format(?PUBLISH_PACKET(?QOS_2, <<"topic">>, 10, <<"payload">>), text)
]),
io:format("~ts", [emqx_packet:format(?SUBACK_PACKET(40, [?QOS_0, ?QOS_1]))]),
io:format("~ts", [emqx_packet:format(?UNSUBSCRIBE_PACKET(89, [<<"t">>, <<"t2">>]))]),
io:format("~ts", [emqx_packet:format(?UNSUBACK_PACKET(90))]),
io:format("~ts", [emqx_packet:format(?DISCONNECT_PACKET(128))]).
io:format("~ts", [emqx_packet:format(?PUBACK_PACKET(?PUBACK, 98), text)]),
io:format("~ts", [emqx_packet:format(?PUBREL_PACKET(99), text)]),
io:format("~ts", [
emqx_packet:format(
?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS_0}, {<<"topic1">>, ?QOS_1}]), text
)
]),
io:format("~ts", [emqx_packet:format(?SUBACK_PACKET(40, [?QOS_0, ?QOS_1]), text)]),
io:format("~ts", [emqx_packet:format(?UNSUBSCRIBE_PACKET(89, [<<"t">>, <<"t2">>]), text)]),
io:format("~ts", [emqx_packet:format(?UNSUBACK_PACKET(90), text)]),
io:format("~ts", [emqx_packet:format(?DISCONNECT_PACKET(128), text)]).
t_parse_empty_publish(_) ->
%% 52: 0011(type=PUBLISH) 0100 (QoS=2)

View File

@ -573,7 +573,7 @@ app_specs(Opts) ->
cluster() ->
ExtraConf = "\n durable_storage.messages.n_sites = 2",
Spec = #{role => core, apps => app_specs(#{extra_emqx_conf => ExtraConf})},
Spec = #{apps => app_specs(#{extra_emqx_conf => ExtraConf})},
[
{persistent_messages_SUITE1, Spec},
{persistent_messages_SUITE2, Spec}

View File

@ -64,18 +64,28 @@ init_per_group(routing_schema_v2, Config) ->
init_per_group(batch_sync_on, Config) ->
[{emqx_config, "broker.routing.batch_sync.enable_on = all"} | Config];
init_per_group(batch_sync_replicants, Config) ->
[{emqx_config, "broker.routing.batch_sync.enable_on = replicant"} | Config];
case emqx_cth_suite:skip_if_oss() of
false ->
[{emqx_config, "broker.routing.batch_sync.enable_on = replicant"} | Config];
True ->
True
end;
init_per_group(batch_sync_off, Config) ->
[{emqx_config, "broker.routing.batch_sync.enable_on = none"} | Config];
init_per_group(cluster, Config) ->
WorkDir = emqx_cth_suite:work_dir(Config),
NodeSpecs = [
{emqx_routing_SUITE1, #{apps => [mk_emqx_appspec(1, Config)], role => core}},
{emqx_routing_SUITE2, #{apps => [mk_emqx_appspec(2, Config)], role => core}},
{emqx_routing_SUITE3, #{apps => [mk_emqx_appspec(3, Config)], role => replicant}}
],
Nodes = emqx_cth_cluster:start(NodeSpecs, #{work_dir => WorkDir}),
[{cluster, Nodes} | Config];
case emqx_cth_suite:skip_if_oss() of
false ->
WorkDir = emqx_cth_suite:work_dir(Config),
NodeSpecs = [
{emqx_routing_SUITE1, #{apps => [mk_emqx_appspec(1, Config)], role => core}},
{emqx_routing_SUITE2, #{apps => [mk_emqx_appspec(2, Config)], role => core}},
{emqx_routing_SUITE3, #{apps => [mk_emqx_appspec(3, Config)], role => replicant}}
],
Nodes = emqx_cth_cluster:start(NodeSpecs, #{work_dir => WorkDir}),
[{cluster, Nodes} | Config];
True ->
True
end;
init_per_group(GroupName, Config) when
GroupName =:= single_batch_on;
GroupName =:= single

View File

@ -1247,7 +1247,7 @@ recv_msgs(Count, Msgs) ->
start_peer(Name, Port) ->
{ok, Node} = emqx_cth_peer:start_link(
Name,
ebin_path()
emqx_common_test_helpers:ebin_path()
),
pong = net_adm:ping(Node),
setup_node(Node, Port),
@ -1261,9 +1261,6 @@ host() ->
[_, Host] = string:tokens(atom_to_list(node()), "@"),
Host.
ebin_path() ->
["-pa" | code:get_path()].
setup_node(Node, Port) ->
EnvHandler =
fun(_) ->

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_auth, [
{description, "EMQX Authentication and authorization"},
{vsn, "0.3.3"},
{vsn, "0.3.4"},
{modules, []},
{registered, [emqx_auth_sup]},
{applications, [

View File

@ -203,6 +203,7 @@ common_fields() ->
enable(type) -> boolean();
enable(default) -> true;
enable(importance) -> ?IMPORTANCE_NO_DOC;
enable(desc) -> ?DESC(?FUNCTION_NAME);
enable(_) -> undefined.

View File

@ -477,9 +477,15 @@ authorize_deny(
sources()
) ->
authz_result().
authorize(Client, PubSub, Topic, _DefaultResult, Sources) ->
authorize(#{username := Username} = Client, PubSub, Topic, _DefaultResult, Sources) ->
case maps:get(is_superuser, Client, false) of
true ->
?tp(authz_skipped, #{reason => client_is_superuser, action => PubSub}),
?TRACE("AUTHZ", "authorization_skipped_as_superuser", #{
username => Username,
topic => Topic,
action => emqx_access_control:format_action(PubSub)
}),
emqx_metrics:inc(?METRIC_SUPERUSER),
{stop, #{result => allow, from => superuser}};
false ->

View File

@ -170,7 +170,12 @@ api_authz_refs() ->
authz_common_fields(Type) ->
[
{type, ?HOCON(Type, #{required => true, desc => ?DESC(type)})},
{enable, ?HOCON(boolean(), #{default => true, desc => ?DESC(enable)})}
{enable,
?HOCON(boolean(), #{
default => true,
importance => ?IMPORTANCE_NO_DOC,
desc => ?DESC(enable)
})}
].
source_types() ->

View File

@ -674,5 +674,77 @@ t_publish_last_will_testament_banned_client_connecting(_Config) ->
ok.
t_sikpped_as_superuser(_Config) ->
ClientInfo = #{
clientid => <<"clientid">>,
username => <<"username">>,
peerhost => {127, 0, 0, 1},
zone => default,
listener => {tcp, default},
is_superuser => true
},
?check_trace(
begin
?assertEqual(
allow,
emqx_access_control:authorize(ClientInfo, ?AUTHZ_PUBLISH(?QOS_0), <<"p/t/0">>)
),
?assertEqual(
allow,
emqx_access_control:authorize(ClientInfo, ?AUTHZ_PUBLISH(?QOS_1), <<"p/t/1">>)
),
?assertEqual(
allow,
emqx_access_control:authorize(ClientInfo, ?AUTHZ_PUBLISH(?QOS_2), <<"p/t/2">>)
),
?assertEqual(
allow,
emqx_access_control:authorize(ClientInfo, ?AUTHZ_SUBSCRIBE(?QOS_0), <<"s/t/0">>)
),
?assertEqual(
allow,
emqx_access_control:authorize(ClientInfo, ?AUTHZ_SUBSCRIBE(?QOS_1), <<"s/t/1">>)
),
?assertEqual(
allow,
emqx_access_control:authorize(ClientInfo, ?AUTHZ_SUBSCRIBE(?QOS_2), <<"s/t/2">>)
)
end,
fun(Trace) ->
?assertMatch(
[
#{
reason := client_is_superuser,
action := #{qos := ?QOS_0, action_type := publish}
},
#{
reason := client_is_superuser,
action := #{qos := ?QOS_1, action_type := publish}
},
#{
reason := client_is_superuser,
action := #{qos := ?QOS_2, action_type := publish}
},
#{
reason := client_is_superuser,
action := #{qos := ?QOS_0, action_type := subscribe}
},
#{
reason := client_is_superuser,
action := #{qos := ?QOS_1, action_type := subscribe}
},
#{
reason := client_is_superuser,
action := #{qos := ?QOS_2, action_type := subscribe}
}
],
?of_kind(authz_skipped, Trace)
),
ok
end
),
ok = snabbkaffe:stop().
stop_apps(Apps) ->
lists:foreach(fun application:stop/1, Apps).

View File

@ -118,8 +118,8 @@ mk_cluster_spec(Opts) ->
Node1Apps = Apps ++ [{emqx_dashboard, "dashboard.listeners.http {enable=true,bind=18083}"}],
Node2Apps = Apps,
[
{emqx_authz_api_cluster_SUITE1, Opts#{role => core, apps => Node1Apps}},
{emqx_authz_api_cluster_SUITE2, Opts#{role => core, apps => Node2Apps}}
{emqx_authz_api_cluster_SUITE1, Opts#{apps => Node1Apps}},
{emqx_authz_api_cluster_SUITE2, Opts#{apps => Node2Apps}}
].
request(Method, URL, Body, Config) ->

View File

@ -31,4 +31,6 @@
-define(AUTHN_TYPE, {?AUTHN_MECHANISM, ?AUTHN_BACKEND}).
-define(AUTHN_TYPE_SCRAM, {?AUTHN_MECHANISM_SCRAM, ?AUTHN_BACKEND}).
-define(AUTHN_DATA_FIELDS, [is_superuser, client_attrs, expire_at, acl]).
-endif.

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_auth_http, [
{description, "EMQX External HTTP API Authentication and Authorization"},
{vsn, "0.3.0"},
{vsn, "0.3.1"},
{registered, []},
{mod, {emqx_auth_http_app, []}},
{applications, [

View File

@ -25,7 +25,7 @@
start(_StartType, _StartArgs) ->
ok = emqx_authz:register_source(?AUTHZ_TYPE, emqx_authz_http),
ok = emqx_authn:register_provider(?AUTHN_TYPE, emqx_authn_http),
ok = emqx_authn:register_provider(?AUTHN_TYPE_SCRAM, emqx_authn_scram_http),
ok = emqx_authn:register_provider(?AUTHN_TYPE_SCRAM, emqx_authn_scram_restapi),
{ok, Sup} = emqx_auth_http_sup:start_link(),
{ok, Sup}.

View File

@ -32,7 +32,9 @@
with_validated_config/2,
generate_request/2,
request_for_log/2,
response_for_log/1
response_for_log/1,
extract_auth_data/2,
safely_parse_body/2
]).
-define(DEFAULT_CONTENT_TYPE, <<"application/json">>).
@ -194,34 +196,14 @@ handle_response(Headers, Body) ->
case safely_parse_body(ContentType, Body) of
{ok, NBody} ->
body_to_auth_data(NBody);
{error, Reason} ->
?TRACE_AUTHN_PROVIDER(
error,
"parse_http_response_failed",
#{content_type => ContentType, body => Body, reason => Reason}
),
{error, _Reason} ->
ignore
end.
body_to_auth_data(Body) ->
case maps:get(<<"result">>, Body, <<"ignore">>) of
<<"allow">> ->
IsSuperuser = emqx_authn_utils:is_superuser(Body),
Attrs = emqx_authn_utils:client_attrs(Body),
try
ExpireAt = expire_at(Body),
ACL = acl(ExpireAt, Body),
Result = merge_maps([ExpireAt, IsSuperuser, ACL, Attrs]),
{ok, Result}
catch
throw:{bad_acl_rule, Reason} ->
%% it's a invalid token, so ok to log
?TRACE_AUTHN_PROVIDER("bad_acl_rule", Reason#{http_body => Body}),
{error, bad_username_or_password};
throw:Reason ->
?TRACE_AUTHN_PROVIDER("bad_response_body", Reason#{http_body => Body}),
{error, bad_username_or_password}
end;
extract_auth_data(http, Body);
<<"deny">> ->
{error, not_authorized};
<<"ignore">> ->
@ -230,6 +212,24 @@ body_to_auth_data(Body) ->
ignore
end.
extract_auth_data(Source, Body) ->
IsSuperuser = emqx_authn_utils:is_superuser(Body),
Attrs = emqx_authn_utils:client_attrs(Body),
try
ExpireAt = expire_at(Body),
ACL = acl(ExpireAt, Source, Body),
Result = merge_maps([ExpireAt, IsSuperuser, ACL, Attrs]),
{ok, Result}
catch
throw:{bad_acl_rule, Reason} ->
%% it's a invalid token, so ok to log
?TRACE_AUTHN_PROVIDER("bad_acl_rule", Reason#{http_body => Body}),
{error, bad_username_or_password};
throw:Reason ->
?TRACE_AUTHN_PROVIDER("bad_response_body", Reason#{http_body => Body}),
{error, bad_username_or_password}
end.
merge_maps([]) -> #{};
merge_maps([Map | Maps]) -> maps:merge(Map, merge_maps(Maps)).
@ -268,40 +268,43 @@ expire_sec(#{<<"expire_at">> := _}) ->
expire_sec(_) ->
undefined.
acl(#{expire_at := ExpireTimeMs}, #{<<"acl">> := Rules}) ->
acl(#{expire_at := ExpireTimeMs}, Source, #{<<"acl">> := Rules}) ->
#{
acl => #{
source_for_logging => http,
source_for_logging => Source,
rules => emqx_authz_rule_raw:parse_and_compile_rules(Rules),
%% It's seconds level precision (like JWT) for authz
%% see emqx_authz_client_info:check/1
expire => erlang:convert_time_unit(ExpireTimeMs, millisecond, second)
}
};
acl(_NoExpire, #{<<"acl">> := Rules}) ->
acl(_NoExpire, Source, #{<<"acl">> := Rules}) ->
#{
acl => #{
source_for_logging => http,
source_for_logging => Source,
rules => emqx_authz_rule_raw:parse_and_compile_rules(Rules)
}
};
acl(_, _) ->
acl(_, _, _) ->
#{}.
safely_parse_body(ContentType, Body) ->
try
parse_body(ContentType, Body)
catch
_Class:_Reason ->
_Class:Reason ->
?TRACE_AUTHN_PROVIDER(
error,
"parse_http_response_failed",
#{content_type => ContentType, body => Body, reason => Reason}
),
{error, invalid_body}
end.
parse_body(<<"application/json", _/binary>>, Body) ->
{ok, emqx_utils_json:decode(Body, [return_maps])};
parse_body(<<"application/x-www-form-urlencoded", _/binary>>, Body) ->
Flags = [<<"result">>, <<"is_superuser">>],
RawMap = maps:from_list(cow_qs:parse_qs(Body)),
NBody = maps:with(Flags, RawMap),
NBody = maps:from_list(cow_qs:parse_qs(Body)),
{ok, NBody};
parse_body(ContentType, _) ->
{error, {unsupported_content_type, ContentType}}.

View File

@ -2,10 +2,19 @@
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_authn_scram_http).
%% Note:
%% This is not an implementation of the RFC 7804:
%% Salted Challenge Response HTTP Authentication Mechanism.
%% This backend is an implementation of scram,
%% which uses an external web resource as a source of user information.
-include_lib("emqx_auth/include/emqx_authn.hrl").
-module(emqx_authn_scram_restapi).
-feature(maybe_expr, enable).
-include("emqx_auth_http.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx_auth/include/emqx_authn.hrl").
-behaviour(emqx_authn_provider).
@ -22,10 +31,6 @@
<<"salt">>
]).
-define(OPTIONAL_USER_INFO_KEYS, [
<<"is_superuser">>
]).
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
@ -72,7 +77,9 @@ authenticate(
reason => Reason
})
end,
emqx_utils_scram:authenticate(AuthMethod, AuthData, AuthCache, RetrieveFun, OnErrFun, State);
emqx_utils_scram:authenticate(
AuthMethod, AuthData, AuthCache, State, RetrieveFun, OnErrFun, ?AUTHN_DATA_FIELDS
);
authenticate(_Credential, _State) ->
ignore.
@ -95,7 +102,7 @@ retrieve(
) ->
Request = emqx_authn_http:generate_request(Credential#{username := Username}, State),
Response = emqx_resource:simple_sync_query(ResourceId, {Method, Request, RequestTimeout}),
?TRACE_AUTHN_PROVIDER("scram_http_response", #{
?TRACE_AUTHN_PROVIDER("scram_restapi_response", #{
request => emqx_authn_http:request_for_log(Credential, State),
response => emqx_authn_http:response_for_log(Response),
resource => ResourceId
@ -113,16 +120,11 @@ retrieve(
handle_response(Headers, Body) ->
ContentType = proplists:get_value(<<"content-type">>, Headers),
case safely_parse_body(ContentType, Body) of
{ok, NBody} ->
body_to_user_info(NBody);
{error, Reason} = Error ->
?TRACE_AUTHN_PROVIDER(
error,
"parse_scram_http_response_failed",
#{content_type => ContentType, body => Body, reason => Reason}
),
Error
maybe
{ok, NBody} ?= emqx_authn_http:safely_parse_body(ContentType, Body),
{ok, UserInfo} ?= body_to_user_info(NBody),
{ok, AuthData} ?= emqx_authn_http:extract_auth_data(scram_restapi, NBody),
{ok, maps:merge(AuthData, UserInfo)}
end.
body_to_user_info(Body) ->
@ -131,26 +133,16 @@ body_to_user_info(Body) ->
true ->
case safely_convert_hex(Required0) of
{ok, Required} ->
UserInfo0 = maps:merge(Required, maps:with(?OPTIONAL_USER_INFO_KEYS, Body)),
UserInfo1 = emqx_utils_maps:safe_atom_key_map(UserInfo0),
UserInfo = maps:merge(#{is_superuser => false}, UserInfo1),
{ok, UserInfo};
{ok, emqx_utils_maps:safe_atom_key_map(Required)};
Error ->
?TRACE_AUTHN_PROVIDER("decode_keys_failed", #{http_body => Body}),
Error
end;
_ ->
?TRACE_AUTHN_PROVIDER("bad_response_body", #{http_body => Body}),
?TRACE_AUTHN_PROVIDER("missing_requried_keys", #{http_body => Body}),
{error, bad_response}
end.
safely_parse_body(ContentType, Body) ->
try
parse_body(ContentType, Body)
catch
_Class:_Reason ->
{error, invalid_body}
end.
safely_convert_hex(Required) ->
try
{ok,
@ -165,15 +157,5 @@ safely_convert_hex(Required) ->
{error, Reason}
end.
parse_body(<<"application/json", _/binary>>, Body) ->
{ok, emqx_utils_json:decode(Body, [return_maps])};
parse_body(<<"application/x-www-form-urlencoded", _/binary>>, Body) ->
Flags = ?REQUIRED_USER_INFO_KEYS ++ ?OPTIONAL_USER_INFO_KEYS,
RawMap = maps:from_list(cow_qs:parse_qs(Body)),
NBody = maps:with(Flags, RawMap),
{ok, NBody};
parse_body(ContentType, _) ->
{error, {unsupported_content_type, ContentType}}.
merge_scram_conf(Conf, State) ->
maps:merge(maps:with([algorithm, iteration_count], Conf), State).

View File

@ -2,7 +2,7 @@
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_authn_scram_http_schema).
-module(emqx_authn_scram_restapi_schema).
-behaviour(emqx_authn_schema).
@ -22,16 +22,16 @@
namespace() -> "authn".
refs() ->
[?R_REF(scram_http_get), ?R_REF(scram_http_post)].
[?R_REF(scram_restapi_get), ?R_REF(scram_restapi_post)].
select_union_member(
#{<<"mechanism">> := ?AUTHN_MECHANISM_SCRAM_BIN, <<"backend">> := ?AUTHN_BACKEND_BIN} = Value
) ->
case maps:get(<<"method">>, Value, undefined) of
<<"get">> ->
[?R_REF(scram_http_get)];
[?R_REF(scram_restapi_get)];
<<"post">> ->
[?R_REF(scramm_http_post)];
[?R_REF(scram_restapi_post)];
Else ->
throw(#{
reason => "unknown_http_method",
@ -43,20 +43,20 @@ select_union_member(
select_union_member(_Value) ->
undefined.
fields(scram_http_get) ->
fields(scram_restapi_get) ->
[
{method, #{type => get, required => true, desc => ?DESC(emqx_authn_http_schema, method)}},
{headers, fun emqx_authn_http_schema:headers_no_content_type/1}
] ++ common_fields();
fields(scram_http_post) ->
fields(scram_restapi_post) ->
[
{method, #{type => post, required => true, desc => ?DESC(emqx_authn_http_schema, method)}},
{headers, fun emqx_authn_http_schema:headers/1}
] ++ common_fields().
desc(scram_http_get) ->
desc(scram_restapi_get) ->
?DESC(emqx_authn_http_schema, get);
desc(scram_http_post) ->
desc(scram_restapi_post) ->
?DESC(emqx_authn_http_schema, post);
desc(_) ->
undefined.

View File

@ -2,7 +2,7 @@
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_authn_scram_http_SUITE).
-module(emqx_authn_scram_restapi_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
@ -21,6 +21,9 @@
-define(ALGORITHM_STR, <<"sha512">>).
-define(ITERATION_COUNT, 4096).
-define(T_ACL_USERNAME, <<"username">>).
-define(T_ACL_PASSWORD, <<"password">>).
-include_lib("emqx/include/emqx_placeholder.hrl").
all() ->
@ -54,11 +57,11 @@ init_per_testcase(_Case, Config) ->
[authentication],
?GLOBAL
),
{ok, _} = emqx_authn_scram_http_test_server:start_link(?HTTP_PORT, ?HTTP_PATH),
{ok, _} = emqx_authn_scram_restapi_test_server:start_link(?HTTP_PORT, ?HTTP_PATH),
Config.
end_per_testcase(_Case, _Config) ->
ok = emqx_authn_scram_http_test_server:stop().
ok = emqx_authn_scram_restapi_test_server:stop().
%%------------------------------------------------------------------------------
%% Tests
@ -72,7 +75,9 @@ t_create(_Config) ->
{create_authenticator, ?GLOBAL, AuthConfig}
),
{ok, [#{provider := emqx_authn_scram_http}]} = emqx_authn_chains:list_authenticators(?GLOBAL).
{ok, [#{provider := emqx_authn_scram_restapi}]} = emqx_authn_chains:list_authenticators(
?GLOBAL
).
t_create_invalid(_Config) ->
AuthConfig = raw_config(),
@ -118,59 +123,8 @@ t_authenticate(_Config) ->
ok = emqx_config:put([mqtt, idle_timeout], 500),
{ok, Pid} = emqx_authn_mqtt_test_client:start_link("127.0.0.1", 1883),
ClientFirstMessage = esasl_scram:client_first_message(Username),
ConnectPacket = ?CONNECT_PACKET(
#mqtt_packet_connect{
proto_ver = ?MQTT_PROTO_V5,
properties = #{
'Authentication-Method' => <<"SCRAM-SHA-512">>,
'Authentication-Data' => ClientFirstMessage
}
}
),
ok = emqx_authn_mqtt_test_client:send(Pid, ConnectPacket),
%% Intentional sleep to trigger idle timeout for the connection not yet authenticated
ok = ct:sleep(1000),
?AUTH_PACKET(
?RC_CONTINUE_AUTHENTICATION,
#{'Authentication-Data' := ServerFirstMessage}
) = receive_packet(),
{continue, ClientFinalMessage, ClientCache} =
esasl_scram:check_server_first_message(
ServerFirstMessage,
#{
client_first_message => ClientFirstMessage,
password => Password,
algorithm => ?ALGORITHM
}
),
AuthContinuePacket = ?AUTH_PACKET(
?RC_CONTINUE_AUTHENTICATION,
#{
'Authentication-Method' => <<"SCRAM-SHA-512">>,
'Authentication-Data' => ClientFinalMessage
}
),
ok = emqx_authn_mqtt_test_client:send(Pid, AuthContinuePacket),
?CONNACK_PACKET(
?RC_SUCCESS,
_,
#{'Authentication-Data' := ServerFinalMessage}
) = receive_packet(),
ok = esasl_scram:check_server_final_message(
ServerFinalMessage, ClientCache#{algorithm => ?ALGORITHM}
).
{ok, Pid} = create_connection(Username, Password),
emqx_authn_mqtt_test_client:stop(Pid).
t_authenticate_bad_props(_Config) ->
Username = <<"u">>,
@ -314,6 +268,47 @@ t_destroy(_Config) ->
_
) = receive_packet().
t_acl(_Config) ->
init_auth(),
ACL = emqx_authn_http_SUITE:acl_rules(),
set_user_handler(?T_ACL_USERNAME, ?T_ACL_PASSWORD, #{acl => ACL}),
{ok, Pid} = create_connection(?T_ACL_USERNAME, ?T_ACL_PASSWORD),
Cases = [
{allow, <<"http-authn-acl/#">>},
{deny, <<"http-authn-acl/1">>},
{deny, <<"t/#">>}
],
try
lists:foreach(
fun(Case) ->
test_acl(Case, Pid)
end,
Cases
)
after
ok = emqx_authn_mqtt_test_client:stop(Pid)
end.
t_auth_expire(_Config) ->
init_auth(),
ExpireSec = 3,
WaitTime = timer:seconds(ExpireSec + 1),
ACL = emqx_authn_http_SUITE:acl_rules(),
set_user_handler(?T_ACL_USERNAME, ?T_ACL_PASSWORD, #{
acl => ACL,
expire_at =>
erlang:system_time(second) + ExpireSec
}),
{ok, Pid} = create_connection(?T_ACL_USERNAME, ?T_ACL_PASSWORD),
timer:sleep(WaitTime),
?assertEqual(false, erlang:is_process_alive(Pid)).
t_is_superuser() ->
State = init_auth(),
ok = test_is_superuser(State, false),
@ -324,12 +319,12 @@ test_is_superuser(State, ExpectedIsSuperuser) ->
Username = <<"u">>,
Password = <<"p">>,
set_user_handler(Username, Password, ExpectedIsSuperuser),
set_user_handler(Username, Password, #{is_superuser => ExpectedIsSuperuser}),
ClientFirstMessage = esasl_scram:client_first_message(Username),
{continue, ServerFirstMessage, ServerCache} =
emqx_authn_scram_http:authenticate(
emqx_authn_scram_restapi:authenticate(
#{
auth_method => <<"SCRAM-SHA-512">>,
auth_data => ClientFirstMessage,
@ -349,7 +344,7 @@ test_is_superuser(State, ExpectedIsSuperuser) ->
),
{ok, UserInfo1, ServerFinalMessage} =
emqx_authn_scram_http:authenticate(
emqx_authn_scram_restapi:authenticate(
#{
auth_method => <<"SCRAM-SHA-512">>,
auth_data => ClientFinalMessage,
@ -382,24 +377,25 @@ raw_config() ->
}.
set_user_handler(Username, Password) ->
set_user_handler(Username, Password, false).
set_user_handler(Username, Password, IsSuperuser) ->
set_user_handler(Username, Password, #{is_superuser => false}).
set_user_handler(Username, Password, Extra0) ->
%% HTTP Server
Handler = fun(Req0, State) ->
#{
username := Username
} = cowboy_req:match_qs([username], Req0),
UserInfo = make_user_info(Password, ?ALGORITHM, ?ITERATION_COUNT, IsSuperuser),
UserInfo = make_user_info(Password, ?ALGORITHM, ?ITERATION_COUNT),
Extra = maps:merge(#{is_superuser => false}, Extra0),
Req = cowboy_req:reply(
200,
#{<<"content-type">> => <<"application/json">>},
emqx_utils_json:encode(UserInfo),
emqx_utils_json:encode(maps:merge(Extra, UserInfo)),
Req0
),
{ok, Req, State}
end,
ok = emqx_authn_scram_http_test_server:set_handler(Handler).
ok = emqx_authn_scram_restapi_test_server:set_handler(Handler).
init_auth() ->
init_auth(raw_config()).
@ -413,7 +409,7 @@ init_auth(Config) ->
{ok, [#{state := State}]} = emqx_authn_chains:list_authenticators(?GLOBAL),
State.
make_user_info(Password, Algorithm, IterationCount, IsSuperuser) ->
make_user_info(Password, Algorithm, IterationCount) ->
{StoredKey, ServerKey, Salt} = esasl_scram:generate_authentication_info(
Password,
#{
@ -424,8 +420,7 @@ make_user_info(Password, Algorithm, IterationCount, IsSuperuser) ->
#{
stored_key => binary:encode_hex(StoredKey),
server_key => binary:encode_hex(ServerKey),
salt => binary:encode_hex(Salt),
is_superuser => IsSuperuser
salt => binary:encode_hex(Salt)
}.
receive_packet() ->
@ -436,3 +431,79 @@ receive_packet() ->
after 1000 ->
ct:fail("Deliver timeout")
end.
create_connection(Username, Password) ->
{ok, Pid} = emqx_authn_mqtt_test_client:start_link("127.0.0.1", 1883),
ClientFirstMessage = esasl_scram:client_first_message(Username),
ConnectPacket = ?CONNECT_PACKET(
#mqtt_packet_connect{
proto_ver = ?MQTT_PROTO_V5,
properties = #{
'Authentication-Method' => <<"SCRAM-SHA-512">>,
'Authentication-Data' => ClientFirstMessage
}
}
),
ok = emqx_authn_mqtt_test_client:send(Pid, ConnectPacket),
%% Intentional sleep to trigger idle timeout for the connection not yet authenticated
ok = ct:sleep(1000),
?AUTH_PACKET(
?RC_CONTINUE_AUTHENTICATION,
#{'Authentication-Data' := ServerFirstMessage}
) = receive_packet(),
{continue, ClientFinalMessage, ClientCache} =
esasl_scram:check_server_first_message(
ServerFirstMessage,
#{
client_first_message => ClientFirstMessage,
password => Password,
algorithm => ?ALGORITHM
}
),
AuthContinuePacket = ?AUTH_PACKET(
?RC_CONTINUE_AUTHENTICATION,
#{
'Authentication-Method' => <<"SCRAM-SHA-512">>,
'Authentication-Data' => ClientFinalMessage
}
),
ok = emqx_authn_mqtt_test_client:send(Pid, AuthContinuePacket),
?CONNACK_PACKET(
?RC_SUCCESS,
_,
#{'Authentication-Data' := ServerFinalMessage}
) = receive_packet(),
ok = esasl_scram:check_server_final_message(
ServerFinalMessage, ClientCache#{algorithm => ?ALGORITHM}
),
{ok, Pid}.
test_acl({allow, Topic}, C) ->
?assertMatch(
[0],
send_subscribe(C, Topic)
);
test_acl({deny, Topic}, C) ->
?assertMatch(
[?RC_NOT_AUTHORIZED],
send_subscribe(C, Topic)
).
send_subscribe(Client, Topic) ->
TopicOpts = #{nl => 0, rap => 0, rh => 0, qos => 0},
Packet = ?SUBSCRIBE_PACKET(1, [{Topic, TopicOpts}]),
emqx_authn_mqtt_test_client:send(Client, Packet),
timer:sleep(200),
?SUBACK_PACKET(1, ReasonCode) = receive_packet(),
ReasonCode.

View File

@ -2,7 +2,7 @@
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_authn_scram_http_test_server).
-module(emqx_authn_scram_restapi_test_server).
-behaviour(supervisor).
-behaviour(cowboy_handler).

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_auth_jwt, [
{description, "EMQX JWT Authentication and Authorization"},
{vsn, "0.3.2"},
{vsn, "0.3.3"},
{registered, []},
{mod, {emqx_auth_jwt_app, []}},
{applications, [

View File

@ -21,6 +21,7 @@
-include_lib("emqx_auth/include/emqx_authn.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(LDAP_HOST, "ldap").
-define(LDAP_DEFAULT_PORT, 389).
@ -46,13 +47,6 @@ init_per_suite(Config) ->
Apps = emqx_cth_suite:start([emqx, emqx_conf, emqx_auth, emqx_auth_ldap], #{
work_dir => ?config(priv_dir, Config)
}),
{ok, _} = emqx_resource:create_local(
?LDAP_RESOURCE,
?AUTHN_RESOURCE_GROUP,
emqx_ldap,
ldap_config(),
#{}
),
[{apps, Apps} | Config];
false ->
{skip, no_ldap}
@ -63,7 +57,6 @@ end_per_suite(Config) ->
[authentication],
?GLOBAL
),
ok = emqx_resource:remove_local(?LDAP_RESOURCE),
ok = emqx_cth_suite:stop(?config(apps, Config)).
%%------------------------------------------------------------------------------
@ -128,6 +121,87 @@ t_create_invalid(_Config) ->
InvalidConfigs
).
t_authenticate_timeout_cause_reconnect(_Config) ->
TestPid = self(),
meck:new(eldap, [non_strict, no_link, passthrough]),
try
%% cause eldap process to be killed
meck:expect(
eldap,
search,
fun
(Pid, [{base, <<"uid=mqttuser0007", _/binary>>} | _]) ->
TestPid ! {eldap_pid, Pid},
{error, {gen_tcp_error, timeout}};
(Pid, Args) ->
meck:passthrough([Pid, Args])
end
),
Credentials = fun(Username) ->
#{
username => Username,
password => Username,
listener => 'tcp:default',
protocol => mqtt
}
end,
SpecificConfigParams = #{},
Result = {ok, #{is_superuser => true}},
Timeout = 1000,
Config0 = raw_ldap_auth_config(),
Config = Config0#{
<<"pool_size">> => 1,
<<"request_timeout">> => Timeout
},
AuthConfig = maps:merge(Config, SpecificConfigParams),
{ok, _} = emqx:update_config(
?PATH,
{create_authenticator, ?GLOBAL, AuthConfig}
),
%% 0006 is a disabled user
?assertEqual(
{error, user_disabled},
emqx_access_control:authenticate(Credentials(<<"mqttuser0006">>))
),
?assertEqual(
{error, not_authorized},
emqx_access_control:authenticate(Credentials(<<"mqttuser0007">>))
),
ok = wait_for_ldap_pid(1000),
[#{id := ResourceID}] = emqx_resource_manager:list_all(),
?retry(1_000, 10, {ok, connected} = emqx_resource_manager:health_check(ResourceID)),
%% turn back to normal
meck:expect(
eldap,
search,
2,
fun(Pid2, Query) ->
meck:passthrough([Pid2, Query])
end
),
%% expect eldap process to be restarted
?assertEqual(Result, emqx_access_control:authenticate(Credentials(<<"mqttuser0007">>))),
emqx_authn_test_lib:delete_authenticators(
[authentication],
?GLOBAL
)
after
meck:unload(eldap)
end.
wait_for_ldap_pid(After) ->
receive
{eldap_pid, Pid} ->
?assertNot(is_process_alive(Pid)),
ok
after After ->
error(timeout)
end.
t_authenticate(_Config) ->
ok = lists:foreach(
fun(Sample) ->
@ -300,6 +374,3 @@ user_seeds() ->
ldap_server() ->
iolist_to_binary(io_lib:format("~s:~B", [?LDAP_HOST, ?LDAP_DEFAULT_PORT])).
ldap_config() ->
emqx_ldap_SUITE:ldap_config([]).

View File

@ -44,7 +44,6 @@ init_per_suite(Config) ->
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
ok = create_ldap_resource(),
[{apps, Apps} | Config];
false ->
{skip, no_ldap}
@ -167,21 +166,8 @@ setup_config(SpecialParams) ->
ldap_server() ->
iolist_to_binary(io_lib:format("~s:~B", [?LDAP_HOST, ?LDAP_DEFAULT_PORT])).
ldap_config() ->
emqx_ldap_SUITE:ldap_config([]).
start_apps(Apps) ->
lists:foreach(fun application:ensure_all_started/1, Apps).
stop_apps(Apps) ->
lists:foreach(fun application:stop/1, Apps).
create_ldap_resource() ->
{ok, _} = emqx_resource:create_local(
?LDAP_RESOURCE,
?AUTHZ_RESOURCE_GROUP,
emqx_ldap,
ldap_config(),
#{}
),
ok.

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_auth_mnesia, [
{description, "EMQX Buitl-in Database Authentication and Authorization"},
{vsn, "0.1.6"},
{vsn, "0.1.7"},
{registered, []},
{mod, {emqx_auth_mnesia_app, []}},
{applications, [

View File

@ -141,7 +141,9 @@ authenticate(
reason => Reason
})
end,
emqx_utils_scram:authenticate(AuthMethod, AuthData, AuthCache, RetrieveFun, OnErrFun, State);
emqx_utils_scram:authenticate(
AuthMethod, AuthData, AuthCache, State, RetrieveFun, OnErrFun, [is_superuser]
);
authenticate(_Credential, _State) ->
ignore.

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_auth_mongodb, [
{description, "EMQX MongoDB Authentication and Authorization"},
{vsn, "0.2.1"},
{vsn, "0.2.2"},
{registered, []},
{mod, {emqx_auth_mongodb_app, []}},
{applications, [

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_auth_mysql, [
{description, "EMQX MySQL Authentication and Authorization"},
{vsn, "0.2.1"},
{vsn, "0.2.2"},
{registered, []},
{mod, {emqx_auth_mysql_app, []}},
{applications, [

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_auth_postgresql, [
{description, "EMQX PostgreSQL Authentication and Authorization"},
{vsn, "0.2.1"},
{vsn, "0.2.2"},
{registered, []},
{mod, {emqx_auth_postgresql_app, []}},
{applications, [

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_auth_redis, [
{description, "EMQX Redis Authentication and Authorization"},
{vsn, "0.2.1"},
{vsn, "0.2.2"},
{registered, []},
{mod, {emqx_auth_redis_app, []}},
{applications, [

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_bridge, [
{description, "EMQX bridges"},
{vsn, "0.2.3"},
{vsn, "0.2.4"},
{registered, [emqx_bridge_sup]},
{mod, {emqx_bridge_app, []}},
{applications, [

View File

@ -123,6 +123,7 @@ common_bridge_fields() ->
boolean(),
#{
desc => ?DESC("desc_enable"),
importance => ?IMPORTANCE_NO_DOC,
default => true
}
)},

View File

@ -65,6 +65,7 @@
-export([
make_producer_action_schema/1, make_producer_action_schema/2,
make_consumer_action_schema/1, make_consumer_action_schema/2,
common_fields/0,
top_level_common_action_keys/0,
top_level_common_source_keys/0,
project_to_actions_resource_opts/1,
@ -507,16 +508,26 @@ make_consumer_action_schema(ParametersRef, Opts) ->
})}
].
common_schema(ParametersRef, _Opts) ->
common_fields() ->
[
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
{enable,
mk(boolean(), #{
desc => ?DESC("config_enable"),
importance => ?IMPORTANCE_NO_DOC,
default => true
})},
{connector,
mk(binary(), #{
desc => ?DESC(emqx_connector_schema, "connector_field"), required => true
})},
{tags, emqx_schema:tags_schema()},
{description, emqx_schema:description_schema()},
{description, emqx_schema:description_schema()}
].
common_schema(ParametersRef, _Opts) ->
[
{parameters, ParametersRef}
| common_fields()
].
project_to_actions_resource_opts(OldResourceOpts) ->

View File

@ -1154,7 +1154,7 @@ t_bridges_probe(Config) ->
?assertMatch(
{ok, 400, #{
<<"code">> := <<"TEST_FAILED">>,
<<"message">> := <<"Connection refused">>
<<"message">> := <<"Connection refused", _/binary>>
}},
request_json(
post,

View File

@ -889,7 +889,8 @@ t_sync_query_down(Config, Opts) ->
),
?force_ordering(
#{?snk_kind := call_query},
#{?snk_kind := SNKKind} when
SNKKind =:= call_query orelse SNKKind =:= simple_query_enter,
#{?snk_kind := cut_connection, ?snk_span := start}
),
%% Note: order of arguments here is reversed compared to `?force_ordering'.
@ -913,6 +914,7 @@ t_sync_query_down(Config, Opts) ->
emqx_common_test_helpers:enable_failure(down, ProxyName, ProxyHost, ProxyPort)
)
end),
?tp("publishing_message", #{}),
try
{_, {ok, _}} =
snabbkaffe:wait_async_action(
@ -921,6 +923,7 @@ t_sync_query_down(Config, Opts) ->
infinity
)
after
?tp("healing_failure", #{}),
emqx_common_test_helpers:heal_failure(down, ProxyName, ProxyHost, ProxyPort)
end,
{ok, _} = snabbkaffe:block_until(SuccessTPFilter, infinity),

View File

@ -23,7 +23,7 @@ defmodule EMQXBridgeAzureEventHub.MixProject do
def deps() do
[
{:wolff, github: "kafka4beam/wolff", tag: "2.0.0"},
{:wolff, github: "kafka4beam/wolff", tag: "3.0.2"},
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true},
{:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},
{:brod, github: "kafka4beam/brod", tag: "3.18.0"},

View File

@ -2,7 +2,7 @@
{erl_opts, [debug_info]}.
{deps, [
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "2.0.0"}}},
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "3.0.2"}}},
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_azure_event_hub, [
{description, "EMQX Enterprise Azure Event Hub Bridge"},
{vsn, "0.1.7"},
{vsn, "0.1.8"},
{registered, []},
{applications, [
kernel,

View File

@ -129,16 +129,7 @@ fields(actions) ->
override(
emqx_bridge_kafka:producer_opts(action),
bridge_v2_overrides()
) ++
[
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
{connector,
mk(binary(), #{
desc => ?DESC(emqx_connector_schema, "connector_field"), required => true
})},
{tags, emqx_schema:tags_schema()},
{description, emqx_schema:description_schema()}
],
) ++ emqx_bridge_v2_schema:common_fields(),
override_documentations(Fields);
fields(Method) ->
Fields = emqx_bridge_kafka:fields(Method),

View File

@ -382,12 +382,31 @@ t_multiple_actions_sharing_topic(Config) ->
ActionConfig0,
#{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}}
),
ok = emqx_bridge_v2_kafka_producer_SUITE:t_multiple_actions_sharing_topic(
[
{type, ?BRIDGE_TYPE_BIN},
{connector_name, ?config(connector_name, Config)},
{connector_config, ?config(connector_config, Config)},
{action_config, ActionConfig}
]
),
ok =
emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME(
[
{type, ?BRIDGE_TYPE_BIN},
{connector_name, ?config(connector_name, Config)},
{connector_config, ?config(connector_config, Config)},
{action_config, ActionConfig}
]
),
ok.
t_dynamic_topics(Config) ->
ActionConfig0 = ?config(action_config, Config),
ActionConfig =
emqx_utils_maps:deep_merge(
ActionConfig0,
#{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}}
),
ok =
emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME(
[
{type, ?BRIDGE_TYPE_BIN},
{connector_name, ?config(connector_name, Config)},
{connector_config, ?config(connector_config, Config)},
{action_config, ActionConfig}
]
),
ok.

Some files were not shown because too many files have changed in this diff Show More