Compare commits

..

197 Commits

Author SHA1 Message Date
Ivan Dyachkov bcd63344b8
Merge pull request #13583 from id/20240807-sync-release-branches
sync release branches
2024-08-07 11:38:14 +02:00
Ivan Dyachkov cc3b26a3ac Merge remote-tracking branch 'upstream/release-58' into 20240807-sync-release-branches 2024-08-07 09:48:38 +02:00
Ivan Dyachkov dd686c24a0 Merge remote-tracking branch 'upstream/release-57' into 20240807-sync-release-branches 2024-08-07 09:44:38 +02:00
Ivan Dyachkov 592c4e0045
Merge pull request #12774 from emqx/dependabot/github_actions/dot-github/actions/package-macos/actions-package-macos-83d1e47aa6
chore(deps): bump the actions-package-macos group in /.github/actions/package-macos with 1 update
2024-08-07 09:43:53 +02:00
Ivan Dyachkov 073e3ea0a8
Merge pull request #13569 from emqx/dependabot/github_actions/actions-ef71aea555
chore(deps): bump the actions group across 1 directory with 8 updates
2024-08-07 09:35:52 +02:00
Xinyu Liu 81978ceaeb
Merge pull request #13571 from terry-xiaoyu/fast_fail_on_invalid_ssl_opts
chore: update esockd to 5.12.0
2024-08-07 11:21:32 +08:00
Ilia Averianov 6bfddd9952
Merge pull request #13565 from savonarola/0801-shared-subs-compact-structures
Reduce size of shared sub protocol structures
2024-08-06 19:56:08 +03:00
Thales Macedo Garitezi cf608a73a5
Merge pull request #13578 from thalesmg/20240806-r58-port-raft-precond
feat(dsraft): support atomic batches + preconditions (release-58)
2024-08-06 13:40:46 -03:00
Thales Macedo Garitezi a8200fb83d
Merge pull request #13579 from thalesmg/20240806-r58-test-flaky-consumer-rebalance
test: attempt to reduce test flakiness
2024-08-06 13:33:46 -03:00
Ilya Averyanov 9ad65c6ac1 feat(queue): reduce logging levels 2024-08-06 18:45:15 +03:00
Thales Macedo Garitezi 9ca3985bbd test: attempt to reduce test flakiness 2024-08-06 12:44:51 -03:00
Ilya Averyanov e17becb84d feat(queue): compact protocol structures, organize formatting 2024-08-06 18:05:02 +03:00
Andrew Mayorov 5dd8fefded test(ds): avoid side effects in check phase 2024-08-06 11:43:12 -03:00
Andrew Mayorov 7b85faf12a chore(dsraft): fix few spelling errors
Co-Authored-By: Thales Macedo Garitezi <thalesmg@gmail.com>
2024-08-06 11:43:12 -03:00
Andrew Mayorov b0594271b2 chore(dsraft): fix a typespec 2024-08-06 11:43:12 -03:00
Andrew Mayorov d8aa39a310 fix(dsraft): use local application environment 2024-08-06 11:43:12 -03:00
Andrew Mayorov fc0434afc8 chore(dslocal): refine few typespecs 2024-08-06 11:43:12 -03:00
Andrew Mayorov 5502af18b7 feat(ds): support deletions + precondition-related API in bitfield-lts 2024-08-06 11:43:12 -03:00
Andrew Mayorov 9f96e0957e test(ds): verify deletions work predictably 2024-08-06 11:43:12 -03:00
Andrew Mayorov 109ffe7a70 fix(dsbackend): unify timestamp resolution in operations / preconditions 2024-08-06 11:43:12 -03:00
Andrew Mayorov 1559aac486 test(dsbackend): add shared tests for atomic batches + preconditions 2024-08-06 11:43:12 -03:00
Andrew Mayorov 68990f1538 feat(ds): support operations + preconditions in skipstream-lts 2024-08-06 11:43:12 -03:00
Andrew Mayorov 5356d678cc feat(dsraft): support atomic batches + preconditions 2024-08-06 11:43:12 -03:00
Andrew Mayorov 11951f8f6c feat(ds): adopt buffer interface to `emqx_ds:operation()` 2024-08-06 11:43:12 -03:00
Andrew Mayorov 0aa4cdbaf3 feat(ds): add generic preconditions implementation 2024-08-06 11:43:12 -03:00
Ivan Dyachkov 281f8ddc83
Merge pull request #13575 from Kinplemelon/kinple/upgrade-dashboard-58
chore(dashboard): bump dashboard version to v1.10.0-beta.1 & e1.8.0-beta.1
2024-08-06 16:39:06 +02:00
Kinplemelon b80513e941 ci: update emqx docs link in dashboard 2024-08-06 15:21:19 +02:00
Ivan Dyachkov 822ed71282 chore: release 5.7.2 2024-08-06 13:25:56 +02:00
Kinple b8fd5de2a5
Merge pull request #13577 from Kinplemelon/kinple/upgrade-dashboard
chore(dashboard): bump dashboard version to v1.9.2 & e1.7.2
2024-08-06 19:02:50 +08:00
Kinplemelon 3ee84d60ae chore(dashboard): bump dashboard version to v1.9.2 & e1.7.2 2024-08-06 18:11:35 +08:00
Andrew Mayorov 3b52b658cd
Merge pull request #13559 from keynslug/feat/EMQX-12309/raft-precond
feat(dsraft): support atomic batches + preconditions
2024-08-06 09:17:16 +02:00
Kinplemelon cba3dcbeda chore(dashboard): bump dashboard version to v1.10.0-beta.1 & e1.8.0-beta.1 2024-08-06 13:44:16 +08:00
Kinple caf1897979
Merge pull request #13574 from Kinplemelon/kinple/upgrade-dashboard
chore(dashboard): bump dashboard version to e1.7.2-beta.7
2024-08-06 10:51:03 +08:00
Kinplemelon dbbd5e1458 ci: update emqx docs link in dashboard 2024-08-06 09:33:20 +08:00
Kinplemelon 0ab31df9d2 chore(dashboard): bump dashboard version to v1.9.2-beta.1 & e1.7.2-beta.7 2024-08-06 09:32:17 +08:00
Thales Macedo Garitezi 613fc644f5
Merge pull request #13425 from kjellwinblad/kjell/review_connector_error_logs_mqtt_etc/EMQX-12555/EMQX-12657
fix: make MQTT connector error log messages easier to understand
2024-08-05 17:34:13 -03:00
Andrew Mayorov b1a53568d6
test(ds): avoid side effects in check phase 2024-08-05 16:34:17 +02:00
Ivan Dyachkov d6651a1889
Merge pull request #13572 from id/20240805-prep-5.8.0-alpha.1
chore: prepare 5.8.0-alpha.1
2024-08-05 16:05:42 +02:00
Ivan Dyachkov 4cf7151139 chore: prepare 5.8.0-alpha.1 2024-08-05 11:09:07 +02:00
Ivan Dyachkov 4865999606 Merge remote-tracking branch 'upstream/master' into release-58 2024-08-05 10:59:59 +02:00
Andrew Mayorov 382feab7d1
chore(dsraft): fix few spelling errors
Co-Authored-By: Thales Macedo Garitezi <thalesmg@gmail.com>
2024-08-05 10:55:49 +02:00
Andrew Mayorov 6aad774075
chore(dsraft): fix a typespec 2024-08-05 10:55:49 +02:00
Andrew Mayorov 649cbf1c79
fix(dsraft): use local application environment 2024-08-05 10:55:49 +02:00
Andrew Mayorov 4cde5e98a3
chore(dslocal): refine few typespecs 2024-08-05 10:55:48 +02:00
Andrew Mayorov d631b5b296
feat(ds): support deletions + precondition-related API in bitfield-lts 2024-08-05 10:55:48 +02:00
Andrew Mayorov 26ec69d5f4
test(ds): verify deletions work predictably 2024-08-05 10:55:48 +02:00
Andrew Mayorov 58b9ab0210
fix(dsbackend): unify timestamp resolution in operations / preconditions 2024-08-05 10:55:22 +02:00
lafirest 4644072fd8
Merge pull request #13570 from lafirest/fix/api_key_bootstrap
fix(api_key): do not crash boot when the bootstrap file is not exists
2024-08-05 16:33:43 +08:00
Shawn bd87e3ce2b chore: update esockd to 5.12.0 2024-08-05 16:18:04 +08:00
firest c9c4d1a196 fix(api_key): do not crash boot when the bootstrap file is not exists 2024-08-05 15:56:05 +08:00
dependabot[bot] 11546b72f4
chore(deps): bump the actions group across 1 directory with 8 updates
Bumps the actions group with 8 updates in the / directory:

| Package | From | To |
| --- | --- | --- |
| [actions/checkout](https://github.com/actions/checkout) | `4.1.2` | `4.1.7` |
| [actions/upload-artifact](https://github.com/actions/upload-artifact) | `4.3.3` | `4.3.5` |
| [actions/download-artifact](https://github.com/actions/download-artifact) | `4.1.7` | `4.1.8` |
| [docker/setup-qemu-action](https://github.com/docker/setup-qemu-action) | `3.0.0` | `3.2.0` |
| [docker/setup-buildx-action](https://github.com/docker/setup-buildx-action) | `3.3.0` | `3.6.1` |
| [docker/login-action](https://github.com/docker/login-action) | `3.2.0` | `3.3.0` |
| [erlef/setup-beam](https://github.com/erlef/setup-beam) | `1.18.0` | `1.18.1` |
| [ossf/scorecard-action](https://github.com/ossf/scorecard-action) | `2.3.3` | `2.4.0` |



Updates `actions/checkout` from 4.1.2 to 4.1.7
- [Release notes](https://github.com/actions/checkout/releases)
- [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md)
- [Commits](https://github.com/actions/checkout/compare/v4.1.2...692973e3d937129bcbf40652eb9f2f61becf3332)

Updates `actions/upload-artifact` from 4.3.3 to 4.3.5
- [Release notes](https://github.com/actions/upload-artifact/releases)
- [Commits](65462800fd...89ef406dd8)

Updates `actions/download-artifact` from 4.1.7 to 4.1.8
- [Release notes](https://github.com/actions/download-artifact/releases)
- [Commits](65a9edc588...fa0a91b85d)

Updates `docker/setup-qemu-action` from 3.0.0 to 3.2.0
- [Release notes](https://github.com/docker/setup-qemu-action/releases)
- [Commits](68827325e0...49b3bc8e6b)

Updates `docker/setup-buildx-action` from 3.3.0 to 3.6.1
- [Release notes](https://github.com/docker/setup-buildx-action/releases)
- [Commits](d70bba72b1...988b5a0280)

Updates `docker/login-action` from 3.2.0 to 3.3.0
- [Release notes](https://github.com/docker/login-action/releases)
- [Commits](0d4c9c5ea7...9780b0c442)

Updates `erlef/setup-beam` from 1.18.0 to 1.18.1
- [Release notes](https://github.com/erlef/setup-beam/releases)
- [Commits](a6e26b2231...b9c58b0450)

Updates `ossf/scorecard-action` from 2.3.3 to 2.4.0
- [Release notes](https://github.com/ossf/scorecard-action/releases)
- [Changelog](https://github.com/ossf/scorecard-action/blob/main/RELEASE.md)
- [Commits](dc50aa9510...62b2cac7ed)

---
updated-dependencies:
- dependency-name: actions/checkout
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: actions
- dependency-name: actions/upload-artifact
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: actions
- dependency-name: actions/download-artifact
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: actions
- dependency-name: docker/setup-qemu-action
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: actions
- dependency-name: docker/setup-buildx-action
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: actions
- dependency-name: docker/login-action
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: actions
- dependency-name: erlef/setup-beam
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: actions
- dependency-name: ossf/scorecard-action
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: actions
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-08-05 03:25:47 +00:00
dependabot[bot] bcb70a9fb9
chore(deps): bump the actions-package-macos group
Bumps the actions-package-macos group in /.github/actions/package-macos with 1 update: [actions/cache](https://github.com/actions/cache).


Updates `actions/cache` from 4.0.1 to 4.0.2
- [Release notes](https://github.com/actions/cache/releases)
- [Changelog](https://github.com/actions/cache/blob/main/RELEASES.md)
- [Commits](ab5e6d0c87...0c45773b62)

---
updated-dependencies:
- dependency-name: actions/cache
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: actions-package-macos
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-08-05 03:17:26 +00:00
JimMoen 09ec31908b
Merge pull request #13357 from JimMoen/fix-utf8-frame-error-connack
Stop returning `CONNACK` or `DISCONNECT` to clients that sent malformed CONNECT packets.

- Only send `CONNACK` with reason code `frame_too_large` for MQTT-v5.0 when connecting if the protocol version field in CONNECT can be detected.
- Otherwise **DONOT** send any CONNACK or DISCONNECT packet.
2024-08-02 15:24:30 +08:00
lafirest b94ec4014f
Merge pull request #13563 from lafirest/fix/payload_encode
fix(log): respect payload encoding settings when formatting packets
2024-08-02 14:38:12 +08:00
firest 74c346f9d1 fix(log): respect payload encoding settings when formatting packets 2024-08-02 12:41:30 +08:00
zhongwencool 8a33ef8576
Merge pull request #13562 from zhongwencool/fix-deactivate-alarm
fix: deactivate alarm before create resource
2024-08-02 12:08:27 +08:00
zhongwencool 6c2033ecbf fix: deactivate alarm before create resource 2024-08-02 11:03:59 +08:00
zmstone 51530588ef ci: fix a typo in commented out docker-compose yaml file 2024-08-01 22:41:42 +02:00
Thales Macedo Garitezi bba9d085d6 test: refactor test structure 2024-08-01 16:03:04 -03:00
Thales Macedo Garitezi 3162fe7a27 feat: prettify some error explanations 2024-08-01 15:31:00 -03:00
Thales Macedo Garitezi 52b2d73b28 test: move new test to newer module and use current apis 2024-08-01 15:13:25 -03:00
Thales Macedo Garitezi 44e7f2e9b2 refactor: use macros for status to avoid typos 2024-08-01 14:49:43 -03:00
Thales Macedo Garitezi baf2b96cbc test: refactor test structure 2024-08-01 14:27:25 -03:00
Kjell Winblad ba2d4f3df3 docs: add change log entry 2024-08-01 14:21:27 -03:00
Kjell Winblad 11aaa7b07d fix: make MQTT connector error log messages easier to understand
Fixes:
https://emqx.atlassian.net/browse/EMQX-12555
https://emqx.atlassian.net/browse/EMQX-12657
2024-08-01 14:21:26 -03:00
Thales Macedo Garitezi 4250d01363
Merge pull request #13546 from thalesmg/20240730-r58-pulsar-query-mode
feat: expose `resource_opts.query_mode` for pulsar action
2024-08-01 14:19:16 -03:00
Thales Macedo Garitezi 86853ac6ef
Merge pull request #13545 from thalesmg/20240730-m-connector-jwt-app
refactor: move JWT worker and helpers to separate app
2024-08-01 13:06:27 -03:00
Andrew Mayorov 810a4d3cf9
test(dsbackend): add shared tests for atomic batches + preconditions 2024-08-01 14:26:45 +02:00
Andrew Mayorov 7b243ef7ad
feat(ds): support operations + preconditions in skipstream-lts 2024-08-01 14:26:45 +02:00
Andrew Mayorov fcf76d28ba
feat(dsraft): support atomic batches + preconditions 2024-08-01 14:26:45 +02:00
Andrew Mayorov 3b5d98c1d9
feat(ds): adopt buffer interface to `emqx_ds:operation()` 2024-08-01 14:26:45 +02:00
Andrew Mayorov 451b03ff99
feat(ds): add generic preconditions implementation 2024-08-01 14:26:45 +02:00
JimMoen f792418a68
Merge pull request #13552 from JimMoen/fix-plugin-app-takes-too-long
fix: add a startup timeout limit for the plugin application
2024-08-01 16:46:09 +08:00
JimMoen 4915cc0da6
chore: add changelog entry for 13357 2024-08-01 15:23:58 +08:00
JimMoen 15b3f4deb0
fix: rm unused func and exports 2024-08-01 15:00:24 +08:00
JimMoen 7a251c9ead
test: handle frame error for CONNECT packets 2024-08-01 10:26:31 +08:00
JimMoen 37a89d0094
fix: enrich parse_state and connection serialize opts 2024-08-01 10:26:31 +08:00
JimMoen c313aa89f0
fix: try throw proto_ver and proto_name when parsing CONNECT packet 2024-08-01 10:26:31 +08:00
JimMoen 6db1c0a446
refactor: separate function to handle `frame_error` 2024-08-01 10:26:31 +08:00
JimMoen d4508a4f1d
chore: sync master `elvis.config` 2024-08-01 10:26:31 +08:00
Thales Macedo Garitezi a6a9538e73 refactor: move JWT worker and helpers to separate app
Some bridge applications might need to use JWTs before the `emqx_connector` is started, so
we must move JWT table initialization to a separate dependency application.
2024-07-31 14:52:12 -03:00
Thales Macedo Garitezi 9f97bff7d0 feat: expose `resource_opts.query_mode` for pulsar action
Fixes https://emqx.atlassian.net/browse/EMQX-12782
2024-07-31 11:13:11 -03:00
Ivan Dyachkov 577f1a7d8a
Merge pull request #13553 from id/20240731-ci-fix-docker-build
ci: fix docker images build
2024-07-31 16:04:47 +02:00
Ivan Dyachkov e42021d314
Merge pull request #13554 from id/20240731-sync-release-57
sync release-57
2024-07-31 15:48:37 +02:00
Thales Macedo Garitezi 08c58cc319
Merge pull request #13543 from thalesmg/20240730-r57-sr-delete-protobuf-cache
fix(schema registry): clear protobuf code cache when deleting/updating serdes
2024-07-31 10:16:48 -03:00
Thales Macedo Garitezi 150fee87f1
Merge pull request #13541 from thalesmg/20240730-r57-unset-crl-check-listener
fix(crl): force remove CRL fields from SSL opts after listener update
2024-07-31 10:16:35 -03:00
ieQu1 6058b50c91
Merge pull request #13555 from ieQu1/ds-rest-404
fix(mgmt): Return 404 for /ds/ API endpoints when DS is disabled
2024-07-31 14:57:17 +02:00
Thales Macedo Garitezi 85cff5e7eb fix: merge conflicts 2024-07-31 09:14:29 -03:00
ieQu1 569f48f5a1
fix(mgmt): Return 404 for /ds/ API endpoints when DS is disabled 2024-07-31 13:44:38 +02:00
ieQu1 2cf86e76ee
Merge pull request #13551 from ieQu1/EMQX-12587
fix(sessds): Expose durable sessions in the config API
2024-07-31 12:00:26 +02:00
Ivan Dyachkov 74cef7937d Merge remote-tracking branch 'upstream/release-57' into 20240731-sync-release-57 2024-07-31 11:31:29 +02:00
JimMoen c658cfe269
fix: make static_check happy 2024-07-31 17:17:13 +08:00
JimMoen a246551914
fix: add a startup timeout limit for the plugin application 2024-07-31 17:17:11 +08:00
JimMoen b1c8bc2421
Merge pull request #13548 from JimMoen/feat-plugin-on-config-changed-callback
feat: call plugin's app module `on_config_changed/2` callback
2024-07-31 16:40:48 +08:00
ieQu1 200b5ab294
Merge pull request #13550 from ieQu1/no-ra-dependency-on-oss
chore(emqx): Remove ra from the list of EMQX dependencies
2024-07-31 10:30:44 +02:00
Ivan Dyachkov 8d8ff6cf5d ci: fix docker images build
/etc/docker/daemon.json requires root for read access
2024-07-31 10:27:04 +02:00
ieQu1 a23b8266b1
fix(sessds): Expose durable sessions in the config API 2024-07-31 10:18:38 +02:00
ieQu1 d69342a2fc
chore(emqx): Remove ra from the list of EMQX dependencies 2024-07-31 09:56:28 +02:00
JimMoen e6bfc14cc9
fix: try-catch optional `on_config_changed/2` plugin app callback 2024-07-31 10:09:02 +08:00
JimMoen 3d1f0c756c
feat: call plugin's app module `on_config_changed/2` callback
assume the module: `[PluginName]_app`
2024-07-31 10:09:02 +08:00
Thales Macedo Garitezi 83041a8b83
Merge pull request #13544 from thalesmg/20240730-m-test-flaky-client-v2
test(clients v2 api): attempt to reduce flakiness
2024-07-30 16:07:37 -03:00
Thales Macedo Garitezi 1c4402b12c test(clients v2 api): attempt to reduce flakiness
https://github.com/emqx/emqx/actions/runs/10161391242/job/28101183920#step:6:331
2024-07-30 14:07:08 -03:00
Thales Macedo Garitezi ebb69f4ebf fix(crl): force remove crl fields from SSL opts after listener update
Fixes https://emqx.atlassian.net/browse/EMQX-12785
2024-07-30 14:00:24 -03:00
Thales Macedo Garitezi fd961f9da7 fix(schema registry): clear protobuf code cache when deleting/updating serde
Fixes https://emqx.atlassian.net/browse/EMQX-12789
2024-07-30 13:52:34 -03:00
Ilia Averianov 359bc38aa4
Merge pull request #13407 from savonarola/0701-shared-sub
Implement shared subscriptions
2024-07-30 16:12:13 +03:00
Ilya Averyanov 08f70e4a25 feat(queue): move ds shared sub dependent test to emqx_ds_shared_sub app 2024-07-30 14:19:39 +03:00
Ilya Averyanov e408804efb feat(queue): fix dialyzer issues 2024-07-30 13:01:48 +03:00
Ilya Averyanov e294d35703 feat(queue): add schema descriptions 2024-07-30 13:01:48 +03:00
Ilya Averyanov 303ff95e10 feat(queue): add stub for CRUD API 2024-07-30 13:01:48 +03:00
Ilya Averyanov 23f0e88b45 feat(queue): add integration with external broker 2024-07-30 13:01:46 +03:00
Ilya Averyanov f0dd1bc4f4 feat(queue): add shared sub support to the management API 2024-07-30 13:01:20 +03:00
Ilya Averyanov 9b30320ddb feat(queue): simplify progress report on disconnect 2024-07-30 13:01:20 +03:00
Ilya Averyanov cae27293a5 feat(queue): move route registration to sessions 2024-07-30 13:01:19 +03:00
Ilya Averyanov 81f4103d60 feat(queue): avoid cyclic dependencies 2024-07-30 13:01:19 +03:00
Ilya Averyanov bab526be24 feat(queue): self-revoke all shared streams on session open 2024-07-30 13:01:19 +03:00
Ilya Averyanov 9307a82004 feat(queue): rearrange leader's code 2024-07-30 13:01:19 +03:00
Ilya Averyanov b8e8f7c8e0 feat(queue): add pre_renew_streams callback 2024-07-30 13:01:18 +03:00
Ilya Averyanov a97a0d6400 feat(queue): fix dialyzer issues 2024-07-30 13:01:18 +03:00
Ilya Averyanov 8705956cdc feat(queue): update docs 2024-07-30 13:01:18 +03:00
Ilya Averyanov f213569460 feat(queue): clarify naming; identify shared subs by full topic filter 2024-07-30 13:01:18 +03:00
Ilya Averyanov 7e23f8d19f feat(queue): fix include 2024-07-30 13:01:17 +03:00
Ilya Averyanov a676ede6b8 feat(queue): improve logging 2024-07-30 13:01:17 +03:00
Ilya Averyanov 9e5e7a23c5 feat(queue): remove unnecessary acked flag 2024-07-30 13:01:17 +03:00
Ilya Averyanov 143086b0ef feat(queue): replace invalid rewing algorithm with skipping iterator 2024-07-30 13:01:16 +03:00
Ilya Averyanov c569625dd1 feat(queue): handle partially unacked ranges 2024-07-30 13:01:16 +03:00
Ilya Averyanov 7daab1ab23 feat(queue): move replay progress to a separate data structure 2024-07-30 13:01:16 +03:00
Ilya Averyanov 077ee38530 feat(queue): add config 2024-07-30 13:01:15 +03:00
Ilya Averyanov b74189570d feat(queue): do not use ee app from emqx app 2024-07-30 13:01:15 +03:00
Ilya Averyanov 649cf88042 feat(queue): kick agents that do not return to the replaying state for long 2024-07-30 13:01:15 +03:00
Ilya Averyanov 1496f7f778 feat(queue): add leader_rank_progress test 2024-07-30 13:01:15 +03:00
Ilya Averyanov 91dd1183ad feat(queue): fix dialyzer issues 2024-07-30 13:01:14 +03:00
Ilya Averyanov 65ab81ff74 feat(queue): fix quick resubscription 2024-07-30 13:01:14 +03:00
Ilya Averyanov 53d4cd3174 feat(queue): rename leader' stream_progresses to stream_states 2024-07-30 13:01:14 +03:00
Ilya Averyanov 7d004b37da feat(queue): implement stream finalization 2024-07-30 13:01:13 +03:00
Ilya Averyanov e5547005eb feat(queue): implement resubscribe test 2024-07-30 13:01:13 +03:00
Ilya Averyanov fada2a3fea feat(queue): reorganize and document shared subs module 2024-07-30 13:01:13 +03:00
Ilya Averyanov b4a010d63b feat(queue): implement unsubscribe 2024-07-30 13:01:13 +03:00
Ilya Averyanov 9bde981c44 feat(queue): fix static check issues 2024-07-30 13:01:12 +03:00
Ilya Averyanov 7658e081c5 feat(queue): move design docs to the EIP 2024-07-30 13:01:12 +03:00
Ilya Averyanov 8dce530d15 feat(queue): fix progress reporting and more tests
We test reassignment during the intensive replay
2024-07-30 13:01:12 +03:00
Ilya Averyanov a20d262327 feat(queue): send progress before fetching new messages 2024-07-30 13:01:11 +03:00
Ilya Averyanov d32f282feb feat(queue): add graceful disconnect 2024-07-30 13:01:11 +03:00
Ilya Averyanov 1d728a05b2 feat(queue): send metadata with agent when connecting to leader
It will be used to attach agent taints to improve stream assignment.
2024-07-30 13:01:11 +03:00
Ilya Averyanov 49bff5c08a feat(queue): wrap remote calls in a proto 2024-07-30 13:01:10 +03:00
Ilya Averyanov 61eda0ff31 feat(queue): identify agents by SessionId in tests 2024-07-30 13:01:10 +03:00
Ilya Averyanov 8f0d807c00 feat(queue): add new test scenarios 2024-07-30 13:01:10 +03:00
Ilya Averyanov bceb5d43ed feat(queue): fix stream rebalancing issues, update tests 2024-07-30 13:01:10 +03:00
Ilya Averyanov 03fea34962 feat(queue): document protocol between agent and leader
Document leader's states
2024-07-30 13:01:09 +03:00
Ilya Averyanov 082514f557 feat(queue): implement full protocol between agent and leader 2024-07-30 13:01:09 +03:00
Ilya Averyanov c831f0772f feat(queue): handle renew_lease_timeout 2024-07-30 13:01:09 +03:00
Ivan Dyachkov ca455ad992
Merge pull request #13532 from emqx/sync-release-57-20240729-021938
Sync release-57
2024-07-30 10:49:09 +02:00
JianBo He c347c2c285
Merge pull request #13540 from zmstone/0729-rule-funciton-getenv-should-be-limited-to-vars-with-prefix-EMQXVAR_
refactor: force getenv to access only OS env with prefix EMQXVAR_
2024-07-30 08:23:24 +08:00
zmstone a49cd78aae refactor: force getenv to access only OS env with prefix EMQXVAR_ 2024-07-29 23:54:00 +02:00
zmstone 4065158be7
Merge pull request #13534 from JimMoen/feat-add-superuser-skip-authz
feat: add authz skipped trace for superuser
2024-07-29 22:30:13 +02:00
zmstone 7f7d0741d2
Merge pull request #13528 from zmstone/0726-unrecoverable-error-limit
0726 unrecoverable error limit
2024-07-29 21:32:14 +02:00
zmstone 5b50d5433a
Merge pull request #13537 from thalesmg/20240729-r57-auto-decode-payload-kprodu
feat: attempt to automatically decode `payload` similar to key and message templates
2024-07-29 21:03:41 +02:00
zmstone eab440e0c1 docs: add changelog for PR 13528 2024-07-29 20:41:57 +02:00
zmstone e08425e67d refactor(log-throttler): remove unnecessary code
there is no need to reset counters before erasing
2024-07-29 20:41:27 +02:00
zhongwencool f6f1d32da0 feat: throttle with resource_id 2024-07-29 20:41:27 +02:00
zhongwencool 2924ec582a feat: add unrecoverable_resource_error throttle 2024-07-29 20:41:27 +02:00
zhongwencool 8dc1d1424a chore: add resource tag for log 2024-07-29 20:41:27 +02:00
Thales Macedo Garitezi 693d5dd394 feat: attempt to automatically decode `payload` similar to key and message templates 2024-07-29 13:01:06 -03:00
lafirest 60aefd1065
Merge pull request #13520 from lafirest/feat/scram-rest-acl
feat(scram): supports ACL rules in `scram_restapi` backend
2024-07-29 22:46:50 +08:00
JianBo He c637422302
Merge pull request #13518 from thalesmg/20240724-r57-dynamic-kprodu-action-mkIII
feat(kafka producer): allow dynamic topics (mkIII)
2024-07-29 22:43:20 +08:00
Thales Macedo Garitezi 6786c9b517
refactor: improve descriptions and identifiers
Co-authored-by: zmstone <zmstone@gmail.com>
2024-07-29 09:45:52 -03:00
Thales Macedo Garitezi 8913de10c0
Merge pull request #13527 from thalesmg/20240726-r57-multiple-froms-sql-test
fix(rule engine tester): fix message publish with bridge source in from clause
2024-07-29 09:37:17 -03:00
JimMoen 5ddd7d7a6a
test: superuser skipped all authz check 2024-07-29 17:27:36 +08:00
JimMoen d7cac74bed
feat: add authz skipped trace 2024-07-29 17:27:36 +08:00
JianBo He 0b0a28ae44
chore: update changes/ee/feat-13504.en.md
Co-authored-by: zmstone <zmstone@gmail.com>
2024-07-29 10:45:24 +08:00
id c1e2801f41 Merge remote-tracking branch 'origin/release-57' into sync-release-57-20240729-021938 2024-07-29 02:19:38 +00:00
Thales Macedo Garitezi 1d56ac6e5e refactor: change topic schema type 2024-07-26 14:26:21 -03:00
Thales Macedo Garitezi 4e0742c66f feat: make kafka producer freely dynamic 2024-07-26 14:25:20 -03:00
lafirest b2f2af6871
Merge pull request #13523 from lafirest/fix/oidc
fix(oidc): fixed update and callback errors for OIDC
2024-07-26 21:09:11 +08:00
Thales Macedo Garitezi 3fae704903 fix(rule engine tester): fix message publish with bridge source in from clause
Fixes https://emqx.atlassian.net/browse/EMQX-12762
2024-07-26 09:27:16 -03:00
lafirest 2d6b2bff8e
Merge pull request #13524 from lafirest/feat/exclusive-cli
feat(exclusive): added CLI interface for exclusive topics
2024-07-26 19:54:03 +08:00
firest dc342a35ac chore: update changes 2024-07-26 17:18:52 +08:00
firest 397c104a85 feat(exclusive): added CLI interface for exclusive topics 2024-07-26 16:56:47 +08:00
firest 49b24a3049 fix(oidc): fixed update and callback errors for OIDC 2024-07-26 15:41:22 +08:00
firest 7bf70aaab6 feat(scram): supports ACL rules in `scram_restapi` backend 2024-07-26 14:30:28 +08:00
zmstone 9a5d50f26a
Merge pull request #13521 from zmstone/0725-add-ldap-reconnect-on-timeout
0725 add ldap reconnect on timeout
2024-07-26 08:29:02 +02:00
Thales Macedo Garitezi df1f4fad70 feat(kafka producer): allow dynamic topics from pre-configured topics
Fixes https://emqx.atlassian.net/browse/EMQX-12656
2024-07-25 15:33:12 -03:00
Thales Macedo Garitezi 33eccb35da chore: update wolff -> 3.0.2 2024-07-25 14:30:50 -03:00
zmstone f6a0f56771 docs: add changelog for PR 13521 2024-07-25 19:24:52 +02:00
zmstone 7631420eef test: add test case to cover ldap search timeout 2024-07-25 18:43:37 +02:00
zmstone 8f94e9684c fix: handle ldap seqrch error 2024-07-25 18:42:09 +02:00
zmstone 43f799508a chore: add ldap test doc 2024-07-25 18:42:08 +02:00
lafirest 1925ed2f55
Merge pull request #13507 from lafirest/feat/env_func
feat(variform): add a builtin function to get env vars
2024-07-25 22:46:50 +08:00
lafirest a45f817f0e
Merge pull request #13515 from lafirest/fix/exclusive
fix(exclusive): allow the same client to resubscribe to an existing exclusive topic
2024-07-25 21:26:08 +08:00
firest 57959ac7d4 chore: update changes 2024-07-25 18:59:40 +08:00
firest 79020b2436 feat(variform): add a builtin function to get env vars 2024-07-25 18:50:53 +08:00
firest 141d8144e4 fix(scram): change the name from `scram_http` to `scram_restapi` 2024-07-25 17:01:49 +08:00
firest 4f21594707 chore: update changes 2024-07-25 09:40:20 +08:00
firest 117c8197d7 fix(exclusive): allow the same client to resubscribe to an existing exclusive topic 2024-07-25 09:40:15 +08:00
Thales Macedo Garitezi c728b98e79
Merge pull request #13510 from thalesmg/20240723-r57-fix-jwt-about-to-expire-check
fix(jwt): fix grace period for renewal check
2024-07-24 16:52:35 -03:00
Thales Macedo Garitezi 9e65e0d048
Merge pull request #13503 from thalesmg/20240722-r57-resource-manager-hc-interval-startup
fix(connector resource): use configuration `resource_opts` for health check interval when starting up
2024-07-24 09:15:47 -03:00
Thales Macedo Garitezi 7374123c5c fix(jwt): fix grace period for renewal check 2024-07-23 17:25:29 -03:00
Thales Macedo Garitezi 8ae54ac325 fix(connector resource): use configuration `resource_opts` for health check interval when starting up
Fixes https://emqx.atlassian.net/browse/EMQX-12738
2024-07-22 11:34:10 -03:00
243 changed files with 7638 additions and 1811 deletions

View File

@ -10,7 +10,7 @@ services:
nofile: 1024
image: openldap
#ports:
# - 389:389
# - "389:389"
volumes:
- ./certs/ca.crt:/etc/certs/ca.crt
restart: always

View File

@ -0,0 +1,61 @@
# LDAP authentication
To run manual tests with the default docker-compose files.
Expose openldap container port by uncommenting the `ports` config in `docker-compose-ldap.yaml `
To start openldap:
```
docker-compose -f ./.ci/docker-compose-file/docker-compose.yaml -f ./.ci/docker-compose-file/docker-compose-ldap.yaml up -docker
```
## LDAP database
LDAP database is populated from below files:
```
apps/emqx_ldap/test/data/emqx.io.ldif /usr/local/etc/openldap/schema/emqx.io.ldif
apps/emqx_ldap/test/data/emqx.schema /usr/local/etc/openldap/schema/emqx.schema
```
## Minimal EMQX config
```
authentication = [
{
backend = ldap
base_dn = "uid=${username},ou=testdevice,dc=emqx,dc=io"
filter = "(& (objectClass=mqttUser) (uid=${username}))"
mechanism = password_based
method {
is_superuser_attribute = isSuperuser
password_attribute = userPassword
type = hash
}
password = public
pool_size = 8
query_timeout = "5s"
request_timeout = "10s"
server = "localhost:1389"
username = "cn=root,dc=emqx,dc=io"
}
]
```
## Example ldapsearch command
```
ldapsearch -x -H ldap://localhost:389 -D "cn=root,dc=emqx,dc=io" -W -b "uid=mqttuser0007,ou=testdevice,dc=emqx,dc=io" "(&(objectClass=mqttUser)(uid=mqttuser0007))"
```
## Example mqttx command
The client password hashes are generated from their username.
```
# disabled user
mqttx pub -t 't/1' -h localhost -p 1883 -m x -u mqttuser0006 -P mqttuser0006
# enabled super-user
mqttx pub -t 't/1' -h localhost -p 1883 -m x -u mqttuser0007 -P mqttuser0007
```

View File

@ -51,7 +51,7 @@ runs:
echo "SELF_HOSTED=false" >> $GITHUB_OUTPUT
;;
esac
- uses: actions/cache@ab5e6d0c87105b4c9c2047343972218f562e4319 # v4.0.1
- uses: actions/cache@0c45773b623bea8c8e75f6c82b208c3cf94ea4f9 # v4.0.2
id: cache
if: steps.prepare.outputs.SELF_HOSTED != 'true'
with:

View File

@ -152,7 +152,7 @@ jobs:
echo "PROFILE=${PROFILE}" | tee -a .env
echo "PKG_VSN=$(./pkg-vsn.sh ${PROFILE})" | tee -a .env
zip -ryq -x@.github/workflows/.zipignore $PROFILE.zip .
- uses: actions/upload-artifact@0b2256b8c012f0828dc542b3febcab082c67f72b # v4.3.4
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
with:
name: ${{ matrix.profile }}
path: ${{ matrix.profile }}.zip

View File

@ -163,7 +163,7 @@ jobs:
echo "PROFILE=${PROFILE}" | tee -a .env
echo "PKG_VSN=$(./pkg-vsn.sh ${PROFILE})" | tee -a .env
zip -ryq -x@.github/workflows/.zipignore $PROFILE.zip .
- uses: actions/upload-artifact@0b2256b8c012f0828dc542b3febcab082c67f72b # v4.3.4
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
with:
name: ${{ matrix.profile }}
path: ${{ matrix.profile }}.zip

View File

@ -83,7 +83,7 @@ jobs:
id: build
run: |
make ${{ matrix.profile }}-tgz
- uses: actions/upload-artifact@0b2256b8c012f0828dc542b3febcab082c67f72b # v4.3.4
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
with:
name: "${{ matrix.profile }}-${{ matrix.arch }}.tar.gz"
path: "_packages/emqx*/emqx-*.tar.gz"
@ -122,9 +122,10 @@ jobs:
run: |
ls -lR _packages/$PROFILE
mv _packages/$PROFILE/*.tar.gz ./
- name: Enable containerd image store on Docker Engine
run: |
echo "$(jq '. += {"features": {"containerd-snapshotter": true}}' /etc/docker/daemon.json)" > daemon.json
echo "$(sudo cat /etc/docker/daemon.json | jq '. += {"features": {"containerd-snapshotter": true}}')" > daemon.json
sudo mv daemon.json /etc/docker/daemon.json
sudo systemctl restart docker

View File

@ -51,7 +51,7 @@ jobs:
if: always()
run: |
docker save $_EMQX_DOCKER_IMAGE_TAG | gzip > $EMQX_NAME-docker-$PKG_VSN.tar.gz
- uses: actions/upload-artifact@0b2256b8c012f0828dc542b3febcab082c67f72b # v4.3.4
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
with:
name: "${{ env.EMQX_NAME }}-docker"
path: "${{ env.EMQX_NAME }}-docker-${{ env.PKG_VSN }}.tar.gz"

View File

@ -95,7 +95,7 @@ jobs:
apple_developer_identity: ${{ secrets.APPLE_DEVELOPER_IDENTITY }}
apple_developer_id_bundle: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE }}
apple_developer_id_bundle_password: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE_PASSWORD }}
- uses: actions/upload-artifact@0b2256b8c012f0828dc542b3febcab082c67f72b # v4.3.4
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: success()
with:
name: ${{ matrix.profile }}-${{ matrix.os }}-${{ matrix.otp }}
@ -180,7 +180,7 @@ jobs:
--builder $BUILDER \
--elixir $IS_ELIXIR \
--pkgtype pkg
- uses: actions/upload-artifact@0b2256b8c012f0828dc542b3febcab082c67f72b # v4.3.4
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
with:
name: ${{ matrix.profile }}-${{ matrix.os }}-${{ matrix.arch }}${{ matrix.with_elixir == 'yes' && '-elixir' || '' }}-${{ matrix.builder }}-${{ matrix.otp }}-${{ matrix.elixir }}
path: _packages/${{ matrix.profile }}/

View File

@ -23,6 +23,7 @@ jobs:
profile:
- ['emqx', 'master']
- ['emqx', 'release-57']
- ['emqx', 'release-58']
os:
- ubuntu22.04
- amzn2023
@ -53,7 +54,7 @@ jobs:
- name: build pkg
run: |
./scripts/buildx.sh --profile "$PROFILE" --pkgtype pkg --builder "$BUILDER"
- uses: actions/upload-artifact@0b2256b8c012f0828dc542b3febcab082c67f72b # v4.3.4
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: success()
with:
name: ${{ matrix.profile[0] }}-${{ matrix.profile[1] }}-${{ matrix.os }}
@ -101,7 +102,7 @@ jobs:
apple_developer_identity: ${{ secrets.APPLE_DEVELOPER_IDENTITY }}
apple_developer_id_bundle: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE }}
apple_developer_id_bundle_password: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE_PASSWORD }}
- uses: actions/upload-artifact@0b2256b8c012f0828dc542b3febcab082c67f72b # v4.3.4
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: success()
with:
name: ${{ matrix.profile }}-${{ matrix.os }}

View File

@ -41,13 +41,13 @@ jobs:
- name: build pkg
run: |
./scripts/buildx.sh --profile $PROFILE --pkgtype pkg --elixir $ELIXIR --arch $ARCH
- uses: actions/upload-artifact@0b2256b8c012f0828dc542b3febcab082c67f72b # v4.3.4
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
with:
name: "${{ matrix.profile[0] }}-${{ matrix.profile[1] }}-${{ matrix.profile[2] }}"
path: _packages/${{ matrix.profile[0] }}/*
retention-days: 7
compression-level: 0
- uses: actions/upload-artifact@0b2256b8c012f0828dc542b3febcab082c67f72b # v4.3.4
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
with:
name: "${{ matrix.profile[0] }}-schema-dump-${{ matrix.profile[1] }}-${{ matrix.profile[2] }}"
path: |
@ -84,7 +84,7 @@ jobs:
apple_developer_identity: ${{ secrets.APPLE_DEVELOPER_IDENTITY }}
apple_developer_id_bundle: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE }}
apple_developer_id_bundle_password: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE_PASSWORD }}
- uses: actions/upload-artifact@0b2256b8c012f0828dc542b3febcab082c67f72b # v4.3.4
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
with:
name: ${{ matrix.os }}
path: _packages/**/*

View File

@ -37,7 +37,7 @@ jobs:
- run: ./scripts/check-elixir-deps-discrepancies.exs
- run: ./scripts/check-elixir-applications.exs
- name: Upload produced lock files
uses: actions/upload-artifact@0b2256b8c012f0828dc542b3febcab082c67f72b # v4.3.4
uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: failure()
with:
name: ${{ matrix.profile }}_produced_lock_files

View File

@ -24,6 +24,7 @@ jobs:
branch:
- master
- release-57
- release-58
language:
- cpp
- python

View File

@ -24,6 +24,7 @@ jobs:
ref:
- master
- release-57
- release-58
steps:
- uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
with:

View File

@ -52,7 +52,7 @@ jobs:
id: package_file
run: |
echo "PACKAGE_FILE=$(find _packages/emqx -name 'emqx-*.deb' | head -n 1 | xargs basename)" >> $GITHUB_OUTPUT
- uses: actions/upload-artifact@0b2256b8c012f0828dc542b3febcab082c67f72b # v4.3.4
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
with:
name: emqx-ubuntu20.04
path: _packages/emqx/${{ steps.package_file.outputs.PACKAGE_FILE }}
@ -113,13 +113,13 @@ jobs:
working-directory: ./tf-emqx-performance-test
run: |
terraform destroy -auto-approve
- uses: actions/upload-artifact@0b2256b8c012f0828dc542b3febcab082c67f72b # v4.3.4
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: success()
with:
name: metrics
path: |
"./tf-emqx-performance-test/*.tar.gz"
- uses: actions/upload-artifact@0b2256b8c012f0828dc542b3febcab082c67f72b # v4.3.4
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: failure()
with:
name: terraform
@ -184,13 +184,13 @@ jobs:
working-directory: ./tf-emqx-performance-test
run: |
terraform destroy -auto-approve
- uses: actions/upload-artifact@0b2256b8c012f0828dc542b3febcab082c67f72b # v4.3.4
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: success()
with:
name: metrics
path: |
"./tf-emqx-performance-test/*.tar.gz"
- uses: actions/upload-artifact@0b2256b8c012f0828dc542b3febcab082c67f72b # v4.3.4
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: failure()
with:
name: terraform
@ -257,13 +257,13 @@ jobs:
working-directory: ./tf-emqx-performance-test
run: |
terraform destroy -auto-approve
- uses: actions/upload-artifact@0b2256b8c012f0828dc542b3febcab082c67f72b # v4.3.4
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: success()
with:
name: metrics
path: |
"./tf-emqx-performance-test/*.tar.gz"
- uses: actions/upload-artifact@0b2256b8c012f0828dc542b3febcab082c67f72b # v4.3.4
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: failure()
with:
name: terraform
@ -330,13 +330,13 @@ jobs:
working-directory: ./tf-emqx-performance-test
run: |
terraform destroy -auto-approve
- uses: actions/upload-artifact@0b2256b8c012f0828dc542b3febcab082c67f72b # v4.3.4
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: success()
with:
name: metrics
path: |
"./tf-emqx-performance-test/*.tar.gz"
- uses: actions/upload-artifact@0b2256b8c012f0828dc542b3febcab082c67f72b # v4.3.4
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: failure()
with:
name: terraform

View File

@ -40,7 +40,7 @@ jobs:
if: failure()
run: |
cat _build/${{ matrix.profile }}/rel/emqx/log/erlang.log.*
- uses: actions/upload-artifact@0b2256b8c012f0828dc542b3febcab082c67f72b # v4.3.4
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: failure()
with:
name: conftest-logs-${{ matrix.profile }}

View File

@ -95,7 +95,7 @@ jobs:
echo "Suites: $SUITES"
./rebar3 as standalone_test ct --name 'test@127.0.0.1' -v --readable=true --suite="$SUITES"
fi
- uses: actions/upload-artifact@0b2256b8c012f0828dc542b3febcab082c67f72b # v4.3.4
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: failure()
with:
name: logs-emqx-app-tests-${{ matrix.type }}

View File

@ -31,7 +31,7 @@ jobs:
else
wget --no-verbose --no-check-certificate -O /tmp/apache-jmeter.tgz $ARCHIVE_URL
fi
- uses: actions/upload-artifact@0b2256b8c012f0828dc542b3febcab082c67f72b # v4.3.4
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
with:
name: apache-jmeter.tgz
path: /tmp/apache-jmeter.tgz
@ -95,7 +95,7 @@ jobs:
echo "check logs failed"
exit 1
fi
- uses: actions/upload-artifact@0b2256b8c012f0828dc542b3febcab082c67f72b # v4.3.4
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: always()
with:
name: jmeter_logs-advanced_feat-${{ matrix.scripts_type }}
@ -175,7 +175,7 @@ jobs:
if: failure()
run: |
docker compose -f .ci/docker-compose-file/docker-compose-emqx-cluster.yaml logs --no-color > ./jmeter_logs/emqx.log
- uses: actions/upload-artifact@0b2256b8c012f0828dc542b3febcab082c67f72b # v4.3.4
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: always()
with:
name: jmeter_logs-pgsql_authn_authz-${{ matrix.scripts_type }}_${{ matrix.pgsql_tag }}
@ -248,7 +248,7 @@ jobs:
echo "check logs failed"
exit 1
fi
- uses: actions/upload-artifact@0b2256b8c012f0828dc542b3febcab082c67f72b # v4.3.4
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: always()
with:
name: jmeter_logs-mysql_authn_authz-${{ matrix.scripts_type }}_${{ matrix.mysql_tag }}
@ -313,7 +313,7 @@ jobs:
echo "check logs failed"
exit 1
fi
- uses: actions/upload-artifact@0b2256b8c012f0828dc542b3febcab082c67f72b # v4.3.4
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: always()
with:
name: jmeter_logs-JWT_authn-${{ matrix.scripts_type }}
@ -370,7 +370,7 @@ jobs:
echo "check logs failed"
exit 1
fi
- uses: actions/upload-artifact@0b2256b8c012f0828dc542b3febcab082c67f72b # v4.3.4
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: always()
with:
name: jmeter_logs-built_in_database_authn_authz-${{ matrix.scripts_type }}

View File

@ -45,7 +45,7 @@ jobs:
run: |
export PROFILE='emqx-enterprise'
make emqx-enterprise-tgz
- uses: actions/upload-artifact@0b2256b8c012f0828dc542b3febcab082c67f72b # v4.3.4
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
name: Upload built emqx and test scenario
with:
name: relup_tests_emqx_built
@ -111,7 +111,7 @@ jobs:
docker logs node2.emqx.io | tee lux_logs/emqx2.log
exit 1
fi
- uses: actions/upload-artifact@0b2256b8c012f0828dc542b3febcab082c67f72b # v4.3.4
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
name: Save debug data
if: failure()
with:

View File

@ -133,7 +133,7 @@ jobs:
if: failure()
run: tar -czf logs.tar.gz _build/test/logs
- uses: actions/upload-artifact@0b2256b8c012f0828dc542b3febcab082c67f72b # v4.3.4
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: failure()
with:
name: logs-${{ matrix.profile }}-${{ matrix.prefix }}-sg${{ matrix.suitegroup }}
@ -193,7 +193,7 @@ jobs:
if: failure()
run: tar -czf logs.tar.gz _build/test/logs
- uses: actions/upload-artifact@0b2256b8c012f0828dc542b3febcab082c67f72b # v4.3.4
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: failure()
with:
name: logs-${{ matrix.profile }}-${{ matrix.prefix }}-sg${{ matrix.suitegroup }}

View File

@ -40,7 +40,7 @@ jobs:
publish_results: true
- name: "Upload artifact"
uses: actions/upload-artifact@0b2256b8c012f0828dc542b3febcab082c67f72b # v4.3.4
uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
with:
name: SARIF file
path: results.sarif

View File

@ -10,8 +10,8 @@ include env.sh
# Dashboard version
# from https://github.com/emqx/emqx-dashboard5
export EMQX_DASHBOARD_VERSION ?= v1.9.1
export EMQX_EE_DASHBOARD_VERSION ?= e1.7.1
export EMQX_DASHBOARD_VERSION ?= v1.10.0-beta.1
export EMQX_EE_DASHBOARD_VERSION ?= e1.8.0-beta.1
export EMQX_RELUP ?= true
export EMQX_REL_FORM ?= tgz

View File

@ -65,9 +65,20 @@
%% Route
%%--------------------------------------------------------------------
-record(share_dest, {
session_id :: emqx_session:session_id(),
group :: emqx_types:group()
}).
-record(route, {
topic :: binary(),
dest :: node() | {binary(), node()} | emqx_session:session_id() | emqx_external_broker:dest()
dest ::
node()
| {binary(), node()}
| emqx_session:session_id()
%% One session can also have multiple subscriptions to the same topic through different groups
| #share_dest{}
| emqx_external_broker:dest()
}).
%%--------------------------------------------------------------------

View File

@ -683,6 +683,7 @@ end).
-define(FRAME_PARSE_ERROR, frame_parse_error).
-define(FRAME_SERIALIZE_ERROR, frame_serialize_error).
-define(THROW_FRAME_ERROR(Reason), erlang:throw({?FRAME_PARSE_ERROR, Reason})).
-define(THROW_SERIALIZE_ERROR(Reason), erlang:throw({?FRAME_SERIALIZE_ERROR, Reason})).

View File

@ -32,7 +32,7 @@
%% `apps/emqx/src/bpapi/README.md'
%% Opensource edition
-define(EMQX_RELEASE_CE, "5.7.1").
-define(EMQX_RELEASE_CE, "5.8.0-alpha.1").
%% Enterprise edition
-define(EMQX_RELEASE_EE, "5.7.1").
-define(EMQX_RELEASE_EE, "5.8.0-alpha.1").

View File

@ -41,16 +41,20 @@
).
%% NOTE: do not forget to use atom for msg and add every used msg to
%% the default value of `log.thorttling.msgs` list.
%% the default value of `log.throttling.msgs` list.
-define(SLOG_THROTTLE(Level, Data),
?SLOG_THROTTLE(Level, Data, #{})
).
-define(SLOG_THROTTLE(Level, Data, Meta),
?SLOG_THROTTLE(Level, undefined, Data, Meta)
).
-define(SLOG_THROTTLE(Level, UniqueKey, Data, Meta),
case logger:allow(Level, ?MODULE) of
true ->
(fun(#{msg := __Msg} = __Data) ->
case emqx_log_throttler:allow(__Msg) of
case emqx_log_throttler:allow(__Msg, UniqueKey) of
true ->
logger:log(Level, __Data, Meta);
false ->
@ -87,7 +91,7 @@
?_DO_TRACE(Tag, Msg, Meta),
?SLOG(
Level,
(emqx_trace_formatter:format_meta_map(Meta))#{msg => Msg, tag => Tag},
(Meta)#{msg => Msg, tag => Tag},
#{is_trace => false}
)
end).

View File

@ -27,6 +27,7 @@
{emqx_ds,2}.
{emqx_ds,3}.
{emqx_ds,4}.
{emqx_ds_shared_sub,1}.
{emqx_eviction_agent,1}.
{emqx_eviction_agent,2}.
{emqx_eviction_agent,3}.

View File

@ -28,15 +28,14 @@
{lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}},
{gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.3"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.12.0"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.5"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.43.2"}}},
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}},
{pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
{recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}},
{snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.10"}}},
{ra, "2.7.3"}
{snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.10"}}}
]}.
{plugins, [{rebar3_proper, "0.12.1"}, rebar3_path_deps]}.

View File

@ -2,7 +2,7 @@
{application, emqx, [
{id, "emqx"},
{description, "EMQX Core"},
{vsn, "5.3.3"},
{vsn, "5.3.4"},
{modules, []},
{registered, []},
{applications, [

View File

@ -146,7 +146,9 @@
-type replies() :: emqx_types:packet() | reply() | [reply()].
-define(IS_MQTT_V5, #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}).
-define(IS_CONNECTED_OR_REAUTHENTICATING(ConnState),
((ConnState == connected) orelse (ConnState == reauthenticating))
).
-define(IS_COMMON_SESSION_TIMER(N),
((N == retry_delivery) orelse (N == expire_awaiting_rel))
).
@ -337,7 +339,7 @@ take_conn_info_fields(Fields, ClientInfo, ConnInfo) ->
| {shutdown, Reason :: term(), channel()}
| {shutdown, Reason :: term(), replies(), channel()}.
handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = ConnState}) when
ConnState =:= connected orelse ConnState =:= reauthenticating
?IS_CONNECTED_OR_REAUTHENTICATING(ConnState)
->
handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel);
handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = connecting}) ->
@ -567,29 +569,8 @@ handle_in(
process_disconnect(ReasonCode, Properties, NChannel);
handle_in(?AUTH_PACKET(), Channel) ->
handle_out(disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, Channel);
handle_in({frame_error, Reason}, Channel = #channel{conn_state = idle}) ->
shutdown(shutdown_count(frame_error, Reason), Channel);
handle_in(
{frame_error, #{cause := frame_too_large} = R}, Channel = #channel{conn_state = connecting}
) ->
shutdown(
shutdown_count(frame_error, R), ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE), Channel
);
handle_in({frame_error, Reason}, Channel = #channel{conn_state = connecting}) ->
shutdown(shutdown_count(frame_error, Reason), ?CONNACK_PACKET(?RC_MALFORMED_PACKET), Channel);
handle_in(
{frame_error, #{cause := frame_too_large}}, Channel = #channel{conn_state = ConnState}
) when
ConnState =:= connected orelse ConnState =:= reauthenticating
->
handle_out(disconnect, {?RC_PACKET_TOO_LARGE, frame_too_large}, Channel);
handle_in({frame_error, Reason}, Channel = #channel{conn_state = ConnState}) when
ConnState =:= connected orelse ConnState =:= reauthenticating
->
handle_out(disconnect, {?RC_MALFORMED_PACKET, Reason}, Channel);
handle_in({frame_error, Reason}, Channel = #channel{conn_state = disconnected}) ->
?SLOG(error, #{msg => "malformed_mqtt_message", reason => Reason}),
{ok, Channel};
handle_in({frame_error, Reason}, Channel) ->
handle_frame_error(Reason, Channel);
handle_in(Packet, Channel) ->
?SLOG(error, #{msg => "disconnecting_due_to_unexpected_message", packet => Packet}),
handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel).
@ -1021,6 +1002,68 @@ not_nacked({deliver, _Topic, Msg}) ->
true
end.
%%--------------------------------------------------------------------
%% Handle Frame Error
%%--------------------------------------------------------------------
handle_frame_error(
Reason = #{cause := frame_too_large},
Channel = #channel{conn_state = ConnState, conninfo = ConnInfo}
) when
?IS_CONNECTED_OR_REAUTHENTICATING(ConnState)
->
ShutdownCount = shutdown_count(frame_error, Reason),
case proto_ver(Reason, ConnInfo) of
?MQTT_PROTO_V5 ->
handle_out(disconnect, {?RC_PACKET_TOO_LARGE, frame_too_large}, Channel);
_ ->
shutdown(ShutdownCount, Channel)
end;
%% Only send CONNACK with reason code `frame_too_large` for MQTT-v5.0 when connecting,
%% otherwise DONOT send any CONNACK or DISCONNECT packet.
handle_frame_error(
Reason,
Channel = #channel{conn_state = ConnState, conninfo = ConnInfo}
) when
is_map(Reason) andalso
(ConnState == idle orelse ConnState == connecting)
->
ShutdownCount = shutdown_count(frame_error, Reason),
ProtoVer = proto_ver(Reason, ConnInfo),
NChannel = Channel#channel{conninfo = ConnInfo#{proto_ver => ProtoVer}},
case ProtoVer of
?MQTT_PROTO_V5 ->
shutdown(ShutdownCount, ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE), NChannel);
_ ->
shutdown(ShutdownCount, NChannel)
end;
handle_frame_error(
Reason,
Channel = #channel{conn_state = connecting}
) ->
shutdown(
shutdown_count(frame_error, Reason),
?CONNACK_PACKET(?RC_MALFORMED_PACKET),
Channel
);
handle_frame_error(
Reason,
Channel = #channel{conn_state = ConnState}
) when
?IS_CONNECTED_OR_REAUTHENTICATING(ConnState)
->
handle_out(
disconnect,
{?RC_MALFORMED_PACKET, Reason},
Channel
);
handle_frame_error(
Reason,
Channel = #channel{conn_state = disconnected}
) ->
?SLOG(error, #{msg => "malformed_mqtt_message", reason => Reason}),
{ok, Channel}.
%%--------------------------------------------------------------------
%% Handle outgoing packet
%%--------------------------------------------------------------------
@ -1289,7 +1332,7 @@ handle_info(
session = Session
}
) when
ConnState =:= connected orelse ConnState =:= reauthenticating
?IS_CONNECTED_OR_REAUTHENTICATING(ConnState)
->
{Intent, Session1} = session_disconnect(ClientInfo, ConnInfo, Session),
Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(sock_closed, Channel)),
@ -2636,8 +2679,7 @@ save_alias(outbound, AliasId, Topic, TopicAliases = #{outbound := Aliases}) ->
NAliases = maps:put(Topic, AliasId, Aliases),
TopicAliases#{outbound => NAliases}.
-compile({inline, [reply/2, shutdown/2, shutdown/3, sp/1, flag/1]}).
-compile({inline, [reply/2, shutdown/2, shutdown/3]}).
reply(Reply, Channel) ->
{reply, Reply, Channel}.
@ -2673,13 +2715,13 @@ disconnect_and_shutdown(
?IS_MQTT_V5 =
#channel{conn_state = ConnState}
) when
ConnState =:= connected orelse ConnState =:= reauthenticating
?IS_CONNECTED_OR_REAUTHENTICATING(ConnState)
->
NChannel = ensure_disconnected(Reason, Channel),
shutdown(Reason, Reply, ?DISCONNECT_PACKET(reason_code(Reason)), NChannel);
%% mqtt v3/v4 connected sessions
disconnect_and_shutdown(Reason, Reply, Channel = #channel{conn_state = ConnState}) when
ConnState =:= connected orelse ConnState =:= reauthenticating
?IS_CONNECTED_OR_REAUTHENTICATING(ConnState)
->
NChannel = ensure_disconnected(Reason, Channel),
shutdown(Reason, Reply, NChannel);
@ -2722,6 +2764,13 @@ is_durable_session(#channel{session = Session}) ->
false
end.
proto_ver(#{proto_ver := ProtoVer}, _ConnInfo) ->
ProtoVer;
proto_ver(_Reason, #{proto_ver := ProtoVer}) ->
ProtoVer;
proto_ver(_, _) ->
?MQTT_PROTO_V4.
%%--------------------------------------------------------------------
%% For CT tests
%%--------------------------------------------------------------------

View File

@ -783,7 +783,8 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
input_bytes => Data,
parsed_packets => Packets
}),
{[{frame_error, Reason} | Packets], State};
NState = enrich_state(Reason, State),
{[{frame_error, Reason} | Packets], NState};
error:Reason:Stacktrace ->
?LOG(error, #{
at_state => emqx_frame:describe_state(ParseState),
@ -1227,6 +1228,12 @@ inc_counter(Key, Inc) ->
_ = emqx_pd:inc_counter(Key, Inc),
ok.
enrich_state(#{parse_state := NParseState}, State) ->
Serialize = emqx_frame:serialize_opts(NParseState),
State#state{parse_state = NParseState, serialize = Serialize};
enrich_state(_, State) ->
State.
set_tcp_keepalive({quic, _Listener}) ->
ok;
set_tcp_keepalive({Type, Id}) ->

View File

@ -117,6 +117,13 @@ try_subscribe(ClientId, Topic) ->
write
),
allow;
[#exclusive_subscription{clientid = ClientId, topic = Topic}] ->
%% Fixed the issue-13476
%% In this feature, the user must manually call `unsubscribe` to release the lock,
%% but sometimes the node may go down for some reason,
%% then the client will reconnect to this node and resubscribe.
%% We need to allow resubscription, otherwise the lock will never be released.
allow;
[_] ->
deny
end.

View File

@ -43,7 +43,9 @@
add_shared_route/2,
delete_shared_route/2,
add_persistent_route/2,
delete_persistent_route/2
delete_persistent_route/2,
add_persistent_shared_route/3,
delete_persistent_shared_route/3
]).
-export_type([dest/0]).
@ -129,6 +131,12 @@ add_persistent_route(Topic, ID) ->
delete_persistent_route(Topic, ID) ->
?safe_with_provider(?FUNCTION_NAME(Topic, ID), ok).
add_persistent_shared_route(Topic, Group, ID) ->
?safe_with_provider(?FUNCTION_NAME(Topic, Group, ID), ok).
delete_persistent_shared_route(Topic, Group, ID) ->
?safe_with_provider(?FUNCTION_NAME(Topic, Group, ID), ok).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------

View File

@ -267,28 +267,50 @@ packet(Header, Variable) ->
packet(Header, Variable, Payload) ->
#mqtt_packet{header = Header, variable = Variable, payload = Payload}.
parse_connect(FrameBin, StrictMode) ->
{ProtoName, Rest} = parse_utf8_string_with_cause(FrameBin, StrictMode, invalid_proto_name),
case ProtoName of
<<"MQTT">> ->
ok;
<<"MQIsdp">> ->
ok;
_ ->
%% from spec: the server MAY send disconnect with reason code 0x84
%% we chose to close socket because the client is likely not talking MQTT anyway
?PARSE_ERR(#{
cause => invalid_proto_name,
expected => <<"'MQTT' or 'MQIsdp'">>,
received => ProtoName
})
end,
parse_connect2(ProtoName, Rest, StrictMode).
parse_connect(FrameBin, Options = #{strict_mode := StrictMode}) ->
{ProtoName, Rest0} = parse_utf8_string_with_cause(FrameBin, StrictMode, invalid_proto_name),
%% No need to parse and check proto_ver if proto_name is invalid, check it first
%% And the matching check of `proto_name` and `proto_ver` fields will be done in `emqx_packet:check_proto_ver/2`
_ = validate_proto_name(ProtoName),
{IsBridge, ProtoVer, Rest2} = parse_connect_proto_ver(Rest0),
NOptions = Options#{version => ProtoVer},
try
do_parse_connect(ProtoName, IsBridge, ProtoVer, Rest2, StrictMode)
catch
throw:{?FRAME_PARSE_ERROR, ReasonM} when is_map(ReasonM) ->
?PARSE_ERR(
ReasonM#{
proto_ver => ProtoVer,
proto_name => ProtoName,
parse_state => ?NONE(NOptions)
}
);
throw:{?FRAME_PARSE_ERROR, Reason} ->
?PARSE_ERR(
#{
cause => Reason,
proto_ver => ProtoVer,
proto_name => ProtoName,
parse_state => ?NONE(NOptions)
}
)
end.
parse_connect2(
do_parse_connect(
ProtoName,
<<BridgeTag:4, ProtoVer:4, UsernameFlagB:1, PasswordFlagB:1, WillRetainB:1, WillQoS:2,
WillFlagB:1, CleanStart:1, Reserved:1, KeepAlive:16/big, Rest2/binary>>,
IsBridge,
ProtoVer,
<<
UsernameFlagB:1,
PasswordFlagB:1,
WillRetainB:1,
WillQoS:2,
WillFlagB:1,
CleanStart:1,
Reserved:1,
KeepAlive:16/big,
Rest/binary
>>,
StrictMode
) ->
_ = validate_connect_reserved(Reserved),
@ -303,14 +325,14 @@ parse_connect2(
UsernameFlag = bool(UsernameFlagB),
PasswordFlag = bool(PasswordFlagB)
),
{Properties, Rest3} = parse_properties(Rest2, ProtoVer, StrictMode),
{Properties, Rest3} = parse_properties(Rest, ProtoVer, StrictMode),
{ClientId, Rest4} = parse_utf8_string_with_cause(Rest3, StrictMode, invalid_clientid),
ConnPacket = #mqtt_packet_connect{
proto_name = ProtoName,
proto_ver = ProtoVer,
%% For bridge mode, non-standard implementation
%% Invented by mosquitto, named 'try_private': https://mosquitto.org/man/mosquitto-conf-5.html
is_bridge = (BridgeTag =:= 8),
is_bridge = IsBridge,
clean_start = bool(CleanStart),
will_flag = WillFlag,
will_qos = WillQoS,
@ -343,16 +365,16 @@ parse_connect2(
unexpected_trailing_bytes => size(Rest7)
})
end;
parse_connect2(_ProtoName, Bin, _StrictMode) ->
%% sent less than 32 bytes
do_parse_connect(_ProtoName, _IsBridge, _ProtoVer, Bin, _StrictMode) ->
%% sent less than 24 bytes
?PARSE_ERR(#{cause => malformed_connect, header_bytes => Bin}).
parse_packet(
#mqtt_packet_header{type = ?CONNECT},
FrameBin,
#{strict_mode := StrictMode}
Options
) ->
parse_connect(FrameBin, StrictMode);
parse_connect(FrameBin, Options);
parse_packet(
#mqtt_packet_header{type = ?CONNACK},
<<AckFlags:8, ReasonCode:8, Rest/binary>>,
@ -516,6 +538,12 @@ parse_packet_id(<<PacketId:16/big, Rest/binary>>) ->
parse_packet_id(_) ->
?PARSE_ERR(invalid_packet_id).
parse_connect_proto_ver(<<BridgeTag:4, ProtoVer:4, Rest/binary>>) ->
{_IsBridge = (BridgeTag =:= 8), ProtoVer, Rest};
parse_connect_proto_ver(Bin) ->
%% sent less than 1 bytes or empty
?PARSE_ERR(#{cause => malformed_connect, header_bytes => Bin}).
parse_properties(Bin, Ver, _StrictMode) when Ver =/= ?MQTT_PROTO_V5 ->
{#{}, Bin};
%% TODO: version mess?
@ -739,6 +767,8 @@ serialize_fun(#{version := Ver, max_size := MaxSize, strict_mode := StrictMode})
initial_serialize_opts(Opts) ->
maps:merge(?DEFAULT_OPTIONS, Opts).
serialize_opts(?NONE(Options)) ->
maps:merge(?DEFAULT_OPTIONS, Options);
serialize_opts(#mqtt_packet_connect{proto_ver = ProtoVer, properties = ConnProps}) ->
MaxSize = get_property('Maximum-Packet-Size', ConnProps, ?MAX_PACKET_SIZE),
#{version => ProtoVer, max_size => MaxSize, strict_mode => false}.
@ -1157,18 +1187,34 @@ validate_subqos([3 | _]) -> ?PARSE_ERR(bad_subqos);
validate_subqos([_ | T]) -> validate_subqos(T);
validate_subqos([]) -> ok.
%% from spec: the server MAY send disconnect with reason code 0x84
%% we chose to close socket because the client is likely not talking MQTT anyway
validate_proto_name(<<"MQTT">>) ->
ok;
validate_proto_name(<<"MQIsdp">>) ->
ok;
validate_proto_name(ProtoName) ->
?PARSE_ERR(#{
cause => invalid_proto_name,
expected => <<"'MQTT' or 'MQIsdp'">>,
received => ProtoName
}).
%% MQTT-v3.1.1-[MQTT-3.1.2-3], MQTT-v5.0-[MQTT-3.1.2-3]
-compile({inline, [validate_connect_reserved/1]}).
validate_connect_reserved(0) -> ok;
validate_connect_reserved(1) -> ?PARSE_ERR(reserved_connect_flag).
-compile({inline, [validate_connect_will/3]}).
%% MQTT-v3.1.1-[MQTT-3.1.2-13], MQTT-v5.0-[MQTT-3.1.2-11]
validate_connect_will(false, _, WillQos) when WillQos > 0 -> ?PARSE_ERR(invalid_will_qos);
validate_connect_will(false, _, WillQoS) when WillQoS > 0 -> ?PARSE_ERR(invalid_will_qos);
%% MQTT-v3.1.1-[MQTT-3.1.2-14], MQTT-v5.0-[MQTT-3.1.2-12]
validate_connect_will(true, _, WillQoS) when WillQoS > 2 -> ?PARSE_ERR(invalid_will_qos);
%% MQTT-v3.1.1-[MQTT-3.1.2-15], MQTT-v5.0-[MQTT-3.1.2-13]
validate_connect_will(false, WillRetain, _) when WillRetain -> ?PARSE_ERR(invalid_will_retain);
validate_connect_will(_, _, _) -> ok.
-compile({inline, [validate_connect_password_flag/4]}).
%% MQTT-v3.1
%% Username flag and password flag are not strongly related
%% https://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#connect
@ -1183,6 +1229,7 @@ validate_connect_password_flag(true, ?MQTT_PROTO_V5, _, _) ->
validate_connect_password_flag(_, _, _, _) ->
ok.
-compile({inline, [bool/1]}).
bool(0) -> false;
bool(1) -> true.

View File

@ -432,7 +432,7 @@ do_start_listener(Type, Name, Id, #{bind := ListenOn} = Opts) when ?ESOCKD_LISTE
esockd:open(
Id,
ListenOn,
merge_default(esockd_opts(Id, Type, Name, Opts))
merge_default(esockd_opts(Id, Type, Name, Opts, _OldOpts = undefined))
);
%% Start MQTT/WS listener
do_start_listener(Type, Name, Id, Opts) when ?COWBOY_LISTENER(Type) ->
@ -476,7 +476,7 @@ do_update_listener(Type, Name, OldConf, NewConf = #{bind := ListenOn}) when
Id = listener_id(Type, Name),
case maps:get(bind, OldConf) of
ListenOn ->
esockd:set_options({Id, ListenOn}, esockd_opts(Id, Type, Name, NewConf));
esockd:set_options({Id, ListenOn}, esockd_opts(Id, Type, Name, NewConf, OldConf));
_Different ->
%% TODO
%% Again, we're not strictly required to drop live connections in this case.
@ -588,7 +588,7 @@ perform_listener_change(update, {{Type, Name, ConfOld}, {_, _, ConfNew}}) ->
perform_listener_change(stop, {Type, Name, Conf}) ->
stop_listener(Type, Name, Conf).
esockd_opts(ListenerId, Type, Name, Opts0) ->
esockd_opts(ListenerId, Type, Name, Opts0, OldOpts) ->
Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0),
Limiter = limiter(Opts0),
Opts2 =
@ -620,7 +620,7 @@ esockd_opts(ListenerId, Type, Name, Opts0) ->
tcp ->
Opts3#{tcp_options => tcp_opts(Opts0)};
ssl ->
OptsWithCRL = inject_crl_config(Opts0),
OptsWithCRL = inject_crl_config(Opts0, OldOpts),
OptsWithSNI = inject_sni_fun(ListenerId, OptsWithCRL),
OptsWithRootFun = inject_root_fun(OptsWithSNI),
OptsWithVerifyFun = inject_verify_fun(OptsWithRootFun),
@ -996,7 +996,7 @@ inject_sni_fun(_ListenerId, Conf) ->
Conf.
inject_crl_config(
Conf = #{ssl_options := #{enable_crl_check := true} = SSLOpts}
Conf = #{ssl_options := #{enable_crl_check := true} = SSLOpts}, _OldOpts
) ->
HTTPTimeout = emqx_config:get([crl_cache, http_timeout], timer:seconds(15)),
Conf#{
@ -1006,7 +1006,16 @@ inject_crl_config(
crl_cache => {emqx_ssl_crl_cache, {internal, [{http, HTTPTimeout}]}}
}
};
inject_crl_config(Conf) ->
inject_crl_config(#{ssl_options := SSLOpts0} = Conf0, #{} = OldOpts) ->
%% Note: we must set crl options to `undefined' to unset them. Otherwise,
%% `esockd' will retain such options when `esockd:merge_opts/2' is called and the SSL
%% options were previously enabled.
WasEnabled = emqx_utils_maps:deep_get([ssl_options, enable_crl_check], OldOpts, false),
Undefine = fun(Acc, K) -> emqx_utils_maps:put_if(Acc, K, undefined, WasEnabled) end,
SSLOpts1 = Undefine(SSLOpts0, crl_check),
SSLOpts = Undefine(SSLOpts1, crl_cache),
Conf0#{ssl_options := SSLOpts};
inject_crl_config(Conf, undefined = _OldOpts) ->
Conf.
maybe_unregister_ocsp_stapling_refresh(

View File

@ -25,7 +25,7 @@
-export([start_link/0]).
%% throttler API
-export([allow/1]).
-export([allow/2]).
%% gen_server callbacks
-export([
@ -40,23 +40,29 @@
-define(SEQ_ID(Msg), {?MODULE, Msg}).
-define(NEW_SEQ, atomics:new(1, [{signed, false}])).
-define(GET_SEQ(Msg), persistent_term:get(?SEQ_ID(Msg), undefined)).
-define(ERASE_SEQ(Msg), persistent_term:erase(?SEQ_ID(Msg))).
-define(RESET_SEQ(SeqRef), atomics:put(SeqRef, 1, 0)).
-define(INC_SEQ(SeqRef), atomics:add(SeqRef, 1, 1)).
-define(GET_DROPPED(SeqRef), atomics:get(SeqRef, 1) - 1).
-define(IS_ALLOWED(SeqRef), atomics:add_get(SeqRef, 1, 1) =:= 1).
-define(NEW_THROTTLE(Msg, SeqRef), persistent_term:put(?SEQ_ID(Msg), SeqRef)).
-define(MSGS_LIST, emqx:get_config([log, throttling, msgs], [])).
-define(TIME_WINDOW_MS, timer:seconds(emqx:get_config([log, throttling, time_window], 60))).
-spec allow(atom()) -> boolean().
allow(Msg) when is_atom(Msg) ->
%% @doc Check if a throttled log message is allowed to pass down to the logger this time.
%% The Msg has to be an atom, and the second argument `UniqueKey' should be `undefined'
%% for predefined message IDs.
%% For relatively static resources created from configurations such as data integration
%% resource IDs `UniqueKey' should be of `binary()' type.
-spec allow(atom(), undefined | binary()) -> boolean().
allow(Msg, UniqueKey) when
is_atom(Msg) andalso (is_binary(UniqueKey) orelse UniqueKey =:= undefined)
->
case emqx_logger:get_primary_log_level() of
debug ->
true;
_ ->
do_allow(Msg)
do_allow(Msg, UniqueKey)
end.
-spec start_link() -> startlink_ret().
@ -68,7 +74,8 @@ start_link() ->
%%--------------------------------------------------------------------
init([]) ->
ok = lists:foreach(fun(Msg) -> ?NEW_THROTTLE(Msg, ?NEW_SEQ) end, ?MSGS_LIST),
process_flag(trap_exit, true),
ok = lists:foreach(fun new_throttler/1, ?MSGS_LIST),
CurrentPeriodMs = ?TIME_WINDOW_MS,
TimerRef = schedule_refresh(CurrentPeriodMs),
{ok, #{timer_ref => TimerRef, current_period_ms => CurrentPeriodMs}}.
@ -86,16 +93,22 @@ handle_info(refresh, #{current_period_ms := PeriodMs} = State) ->
DroppedStats = lists:foldl(
fun(Msg, Acc) ->
case ?GET_SEQ(Msg) of
%% Should not happen, unless the static ids list is updated at run-time.
undefined ->
?NEW_THROTTLE(Msg, ?NEW_SEQ),
%% Should not happen, unless the static ids list is updated at run-time.
new_throttler(Msg),
?tp(log_throttler_new_msg, #{throttled_msg => Msg}),
Acc;
SeqMap when is_map(SeqMap) ->
maps:fold(
fun(Key, Ref, Acc0) ->
ID = iolist_to_binary([atom_to_binary(Msg), $:, Key]),
drop_stats(Ref, ID, Acc0)
end,
Acc,
SeqMap
);
SeqRef ->
Dropped = ?GET_DROPPED(SeqRef),
ok = ?RESET_SEQ(SeqRef),
?tp(log_throttler_dropped, #{dropped_count => Dropped, throttled_msg => Msg}),
maybe_add_dropped(Msg, Dropped, Acc)
drop_stats(SeqRef, Msg, Acc)
end
end,
#{},
@ -112,7 +125,16 @@ handle_info(Info, State) ->
?SLOG(error, #{msg => "unxpected_info", info => Info}),
{noreply, State}.
drop_stats(SeqRef, Msg, Acc) ->
Dropped = ?GET_DROPPED(SeqRef),
ok = ?RESET_SEQ(SeqRef),
?tp(log_throttler_dropped, #{dropped_count => Dropped, throttled_msg => Msg}),
maybe_add_dropped(Msg, Dropped, Acc).
terminate(_Reason, _State) ->
%% atomics do not have delete/remove/release/deallocate API
%% after the reference is garbage-collected the resource is released
lists:foreach(fun(Msg) -> ?ERASE_SEQ(Msg) end, ?MSGS_LIST),
ok.
code_change(_OldVsn, State, _Extra) ->
@ -122,17 +144,27 @@ code_change(_OldVsn, State, _Extra) ->
%% internal functions
%%--------------------------------------------------------------------
do_allow(Msg) ->
do_allow(Msg, UniqueKey) ->
case persistent_term:get(?SEQ_ID(Msg), undefined) of
undefined ->
%% This is either a race condition (emqx_log_throttler is not started yet)
%% or a developer mistake (msg used in ?SLOG_THROTTLE/2,3 macro is
%% not added to the default value of `log.throttling.msgs`.
?SLOG(info, #{
msg => "missing_log_throttle_sequence",
?SLOG(debug, #{
msg => "log_throttle_disabled",
throttled_msg => Msg
}),
true;
%% e.g: unrecoverable msg throttle according resource_id
SeqMap when is_map(SeqMap) ->
case maps:find(UniqueKey, SeqMap) of
{ok, SeqRef} ->
?IS_ALLOWED(SeqRef);
error ->
SeqRef = ?NEW_SEQ,
new_throttler(Msg, SeqMap#{UniqueKey => SeqRef}),
true
end;
SeqRef ->
?IS_ALLOWED(SeqRef)
end.
@ -154,3 +186,11 @@ maybe_log_dropped(_DroppedStats, _PeriodMs) ->
schedule_refresh(PeriodMs) ->
?tp(log_throttler_sched_refresh, #{new_period_ms => PeriodMs}),
erlang:send_after(PeriodMs, ?MODULE, refresh).
new_throttler(unrecoverable_resource_error = Msg) ->
new_throttler(Msg, #{});
new_throttler(Msg) ->
new_throttler(Msg, ?NEW_SEQ).
new_throttler(Msg, AtomicOrEmptyMap) ->
persistent_term:put(?SEQ_ID(Msg), AtomicOrEmptyMap).

View File

@ -105,7 +105,7 @@ format(Msg, Meta, Config) ->
maybe_format_msg(undefined, _Meta, _Config) ->
#{};
maybe_format_msg({report, Report0} = Msg, #{report_cb := Cb} = Meta, Config) ->
Report = emqx_logger_textfmt:try_encode_payload(Report0, Config),
Report = emqx_logger_textfmt:try_encode_meta(Report0, Config),
case is_map(Report) andalso Cb =:= ?DEFAULT_FORMATTER of
true ->
%% reporting a map without a customised format function

View File

@ -20,7 +20,7 @@
-export([format/2]).
-export([check_config/1]).
-export([try_format_unicode/1, try_encode_payload/2]).
-export([try_format_unicode/1, try_encode_meta/2]).
%% Used in the other log formatters
-export([evaluate_lazy_values_if_dbg_level/1, evaluate_lazy_values/1]).
@ -111,7 +111,7 @@ is_list_report_acceptable(_) ->
enrich_report(ReportRaw0, Meta, Config) ->
%% clientid and peername always in emqx_conn's process metadata.
%% topic and username can be put in meta using ?SLOG/3, or put in msg's report by ?SLOG/2
ReportRaw = try_encode_payload(ReportRaw0, Config),
ReportRaw = try_encode_meta(ReportRaw0, Config),
Topic =
case maps:get(topic, Meta, undefined) of
undefined -> maps:get(topic, ReportRaw, undefined);
@ -180,9 +180,22 @@ enrich_topic({Fmt, Args}, #{topic := Topic}) when is_list(Fmt) ->
enrich_topic(Msg, _) ->
Msg.
try_encode_payload(#{payload := Payload} = Report, #{payload_encode := Encode}) ->
try_encode_meta(Report, Config) ->
lists:foldl(
fun(Meta, Acc) ->
try_encode_meta(Meta, Acc, Config)
end,
Report,
[payload, packet]
).
try_encode_meta(payload, #{payload := Payload} = Report, #{payload_encode := Encode}) ->
Report#{payload := encode_payload(Payload, Encode)};
try_encode_payload(Report, _Config) ->
try_encode_meta(packet, #{packet := Packet} = Report, #{payload_encode := Encode}) when
is_tuple(Packet)
->
Report#{packet := emqx_packet:format(Packet, Encode)};
try_encode_meta(_, Report, _Config) ->
Report.
encode_payload(Payload, text) ->
@ -190,4 +203,5 @@ encode_payload(Payload, text) ->
encode_payload(_Payload, hidden) ->
"******";
encode_payload(Payload, hex) ->
binary:encode_hex(Payload).
Bin = emqx_utils_conv:bin(Payload),
binary:encode_hex(Bin).

View File

@ -51,7 +51,6 @@
]).
-export([
format/1,
format/2
]).
@ -481,10 +480,6 @@ will_msg(#mqtt_packet_connect{
headers = #{username => Username, properties => Props}
}.
%% @doc Format packet
-spec format(emqx_types:packet()) -> iolist().
format(Packet) -> format(Packet, emqx_trace_handler:payload_encode()).
%% @doc Format packet
-spec format(emqx_types:packet(), hex | text | hidden) -> iolist().
format(#mqtt_packet{header = Header, variable = Variable, payload = Payload}, PayloadEncode) ->

View File

@ -621,9 +621,13 @@ handle_timeout(ClientInfo, ?TIMER_RETRY_REPLAY, Session0) ->
Session = replay_streams(Session0, ClientInfo),
{ok, [], Session};
handle_timeout(ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0, shared_sub_s := SharedSubS0}) ->
S1 = emqx_persistent_session_ds_subs:gc(S0),
S2 = emqx_persistent_session_ds_stream_scheduler:renew_streams(S1),
{S, SharedSubS} = emqx_persistent_session_ds_shared_subs:renew_streams(S2, SharedSubS0),
%% `gc` and `renew_streams` methods may drop unsubscribed streams.
%% Shared subscription handler must have a chance to see unsubscribed streams
%% in the fully replayed state.
{S1, SharedSubS1} = emqx_persistent_session_ds_shared_subs:pre_renew_streams(S0, SharedSubS0),
S2 = emqx_persistent_session_ds_subs:gc(S1),
S3 = emqx_persistent_session_ds_stream_scheduler:renew_streams(S2),
{S, SharedSubS} = emqx_persistent_session_ds_shared_subs:renew_streams(S3, SharedSubS1),
Interval = get_config(ClientInfo, [renew_streams_interval]),
Session = emqx_session:ensure_timer(
?TIMER_GET_STREAMS,
@ -757,7 +761,7 @@ skip_batch(StreamKey, SRS0, Session = #{s := S0}, ClientInfo, Reason) ->
%%--------------------------------------------------------------------
-spec disconnect(session(), emqx_types:conninfo()) -> {shutdown, session()}.
disconnect(Session = #{id := Id, s := S0}, ConnInfo) ->
disconnect(Session = #{id := Id, s := S0, shared_sub_s := SharedSubS0}, ConnInfo) ->
S1 = maybe_set_offline_info(S0, Id),
S2 = emqx_persistent_session_ds_state:set_last_alive_at(now_ms(), S1),
S3 =
@ -767,8 +771,9 @@ disconnect(Session = #{id := Id, s := S0}, ConnInfo) ->
_ ->
S2
end,
S = emqx_persistent_session_ds_state:commit(S3),
{shutdown, Session#{s => S}}.
{S4, SharedSubS} = emqx_persistent_session_ds_shared_subs:on_disconnect(S3, SharedSubS0),
S = emqx_persistent_session_ds_state:commit(S4),
{shutdown, Session#{s => S, shared_sub_s => SharedSubS}}.
-spec terminate(Reason :: term(), session()) -> ok.
terminate(_Reason, Session = #{id := Id, s := S}) ->
@ -816,10 +821,12 @@ list_client_subscriptions(ClientId) ->
{error, not_found}
end.
-spec get_client_subscription(emqx_types:clientid(), emqx_types:topic()) ->
-spec get_client_subscription(emqx_types:clientid(), topic_filter() | share_topic_filter()) ->
subscription() | undefined.
get_client_subscription(ClientId, Topic) ->
emqx_persistent_session_ds_subs:cold_get_subscription(ClientId, Topic).
get_client_subscription(ClientId, #share{} = ShareTopicFilter) ->
emqx_persistent_session_ds_shared_subs:cold_get_subscription(ClientId, ShareTopicFilter);
get_client_subscription(ClientId, TopicFilter) ->
emqx_persistent_session_ds_subs:cold_get_subscription(ClientId, TopicFilter).
%%--------------------------------------------------------------------
%% Session tables operations
@ -986,14 +993,14 @@ do_ensure_all_iterators_closed(_DSSessionID) ->
%% Normal replay:
%%--------------------------------------------------------------------
fetch_new_messages(Session0 = #{s := S0}, ClientInfo) ->
LFS = maps:get(last_fetched_stream, Session0, beginning),
ItStream = emqx_persistent_session_ds_stream_scheduler:iter_next_streams(LFS, S0),
fetch_new_messages(Session0 = #{s := S0, shared_sub_s := SharedSubS0}, ClientInfo) ->
{S1, SharedSubS1} = emqx_persistent_session_ds_shared_subs:on_streams_replay(S0, SharedSubS0),
Session1 = Session0#{s => S1, shared_sub_s => SharedSubS1},
LFS = maps:get(last_fetched_stream, Session1, beginning),
ItStream = emqx_persistent_session_ds_stream_scheduler:iter_next_streams(LFS, S1),
BatchSize = get_config(ClientInfo, [batch_size]),
Session1 = fetch_new_messages(ItStream, BatchSize, Session0, ClientInfo),
#{s := S1, shared_sub_s := SharedSubS0} = Session1,
{S2, SharedSubS1} = emqx_persistent_session_ds_shared_subs:on_streams_replayed(S1, SharedSubS0),
Session1#{s => S2, shared_sub_s => SharedSubS1}.
Session2 = fetch_new_messages(ItStream, BatchSize, Session1, ClientInfo),
Session2#{shared_sub_s => SharedSubS1}.
fetch_new_messages(ItStream0, BatchSize, Session0, ClientInfo) ->
#{inflight := Inflight} = Session0,

View File

@ -17,7 +17,7 @@
-module(emqx_persistent_session_ds_router).
-include("emqx.hrl").
-include("emqx_persistent_session_ds/emqx_ps_ds_int.hrl").
-include("emqx_ps_ds_int.hrl").
-export([init_tables/0]).
@ -47,7 +47,7 @@
-endif.
-type route() :: #ps_route{}.
-type dest() :: emqx_persistent_session_ds:id().
-type dest() :: emqx_persistent_session_ds:id() | #share_dest{}.
-export_type([dest/0, route/0]).
@ -161,7 +161,7 @@ topics() ->
print_routes(Topic) ->
lists:foreach(
fun(#ps_route{topic = To, dest = Dest}) ->
io:format("~ts -> ~ts~n", [To, Dest])
io:format("~ts -> ~tp~n", [To, Dest])
end,
match_routes(Topic)
).
@ -247,6 +247,8 @@ mk_filtertab_fold_fun(FoldFun) ->
match_filters(Topic) ->
emqx_topic_index:matches(Topic, ?PS_FILTERS_TAB, []).
get_dest_session_id(#share_dest{session_id = DSSessionId}) ->
DSSessionId;
get_dest_session_id({_, DSSessionId}) ->
DSSessionId;
get_dest_session_id(DSSessionId) ->

View File

@ -2,11 +2,37 @@
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
%% @doc This module
%% * handles creation and management of _shared_ subscriptions for the session;
%% * provides streams to the session;
%% * handles progress of stream replay.
%%
%% The logic is quite straightforward; most of the parts resemble the logic of the
%% `emqx_persistent_session_ds_subs` (subscribe/unsubscribe) and
%% `emqx_persistent_session_ds_scheduler` (providing new streams),
%% but some data is sent or received from the `emqx_persistent_session_ds_shared_subs_agent`
%% which communicates with remote shared subscription leaders.
%%
%% A tricky part is the concept of "scheduled actions". When we unsubscribe from a topic
%% we may have some streams that have unacked messages. So we do not have a reliable
%% progress for them. Sending the current progress to the leader and disconnecting
%% will lead to the duplication of messages. So after unsubscription, we need to wait
%% some time until all streams are acked, and only then we disconnect from the leader.
%%
%% For this purpose we have the `scheduled_actions` map in the state of the module.
%% We preserve there the streams that we need to wait for and collect their progress.
%% We also use `scheduled_actions` for resubscriptions. If a client quickly resubscribes
%% after unsubscription, we may still have the mentioned streams unacked. If we abandon
%% them, just connect to the leader, then it may lease us the same streams again, but with
%% the previous progress. So messages may duplicate.
-module(emqx_persistent_session_ds_shared_subs).
-include("emqx_mqtt.hrl").
-include("emqx.hrl").
-include("logger.hrl").
-include("session_internals.hrl").
-include_lib("snabbkaffe/include/trace.hrl").
-export([
@ -15,16 +41,56 @@
on_subscribe/3,
on_unsubscribe/4,
on_disconnect/2,
on_streams_replayed/2,
on_streams_replay/2,
on_info/3,
pre_renew_streams/2,
renew_streams/2,
to_map/2
]).
%% Management API:
-export([
cold_get_subscription/2
]).
-export([
format_lease_events/1,
format_stream_progresses/1
]).
-define(schedule_subscribe, schedule_subscribe).
-define(schedule_unsubscribe, schedule_unsubscribe).
-type stream_key() :: {emqx_persistent_session_ds:id(), emqx_ds:stream()}.
-type scheduled_action_type() ::
{?schedule_subscribe, emqx_types:subopts()} | ?schedule_unsubscribe.
-type agent_stream_progress() :: #{
stream := emqx_ds:stream(),
progress := progress(),
use_finished := boolean()
}.
-type progress() ::
#{
iterator := emqx_ds:iterator()
}.
-type scheduled_action() :: #{
type := scheduled_action_type(),
stream_keys_to_wait := [stream_key()],
progresses := [agent_stream_progress()]
}.
-type t() :: #{
agent := emqx_persistent_session_ds_shared_subs_agent:t()
agent := emqx_persistent_session_ds_shared_subs_agent:t(),
scheduled_actions := #{
share_topic_filter() => scheduled_action()
}
}.
-type share_topic_filter() :: emqx_persistent_session_ds:share_topic_filter().
-type opts() :: #{
@ -34,184 +100,90 @@
-define(rank_x, rank_shared).
-define(rank_y, 0).
-export_type([
progress/0,
agent_stream_progress/0
]).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
%%--------------------------------------------------------------------
%% new
-spec new(opts()) -> t().
new(Opts) ->
#{
agent => emqx_persistent_session_ds_shared_subs_agent:new(
agent_opts(Opts)
)
),
scheduled_actions => #{}
}.
%%--------------------------------------------------------------------
%% open
-spec open(emqx_persistent_session_ds_state:t(), opts()) ->
{ok, emqx_persistent_session_ds_state:t(), t()}.
open(S, Opts) ->
open(S0, Opts) ->
SharedSubscriptions = fold_shared_subs(
fun(#share{} = TopicFilter, Sub, Acc) ->
[{TopicFilter, to_agent_subscription(S, Sub)} | Acc]
fun(#share{} = ShareTopicFilter, Sub, Acc) ->
[{ShareTopicFilter, to_agent_subscription(S0, Sub)} | Acc]
end,
[],
S
S0
),
Agent = emqx_persistent_session_ds_shared_subs_agent:open(
SharedSubscriptions, agent_opts(Opts)
),
SharedSubS = #{agent => Agent},
{ok, S, SharedSubS}.
SharedSubS = #{agent => Agent, scheduled_actions => #{}},
S1 = revoke_all_streams(S0),
{ok, S1, SharedSubS}.
%%--------------------------------------------------------------------
%% on_subscribe
-spec on_subscribe(
share_topic_filter(),
emqx_types:subopts(),
emqx_persistent_session_ds:session()
) -> {ok, emqx_persistent_session_ds_state:t(), t()} | {error, emqx_types:reason_code()}.
on_subscribe(TopicFilter, SubOpts, #{s := S} = Session) ->
Subscription = emqx_persistent_session_ds_state:get_subscription(TopicFilter, S),
on_subscribe(Subscription, TopicFilter, SubOpts, Session).
-spec on_unsubscribe(
emqx_persistent_session_ds:id(),
emqx_persistent_session_ds:topic_filter(),
emqx_persistent_session_ds_state:t(),
t()
) ->
{ok, emqx_persistent_session_ds_state:t(), t(), emqx_persistent_session_ds:subscription()}
| {error, emqx_types:reason_code()}.
on_unsubscribe(SessionId, TopicFilter, S0, #{agent := Agent0} = SharedSubS0) ->
case lookup(TopicFilter, S0) of
undefined ->
{error, ?RC_NO_SUBSCRIPTION_EXISTED};
Subscription ->
?tp(persistent_session_ds_subscription_delete, #{
session_id => SessionId, topic_filter => TopicFilter
}),
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_unsubscribe(
Agent0, TopicFilter
),
SharedSubS = SharedSubS0#{agent => Agent1},
S = emqx_persistent_session_ds_state:del_subscription(TopicFilter, S0),
{ok, S, SharedSubS, Subscription}
end.
-spec renew_streams(emqx_persistent_session_ds_state:t(), t()) ->
{emqx_persistent_session_ds_state:t(), t()}.
renew_streams(S0, #{agent := Agent0} = SharedSubS0) ->
{StreamLeaseEvents, Agent1} = emqx_persistent_session_ds_shared_subs_agent:renew_streams(
Agent0
),
?tp(info, shared_subs_new_stream_lease_events, #{stream_lease_events => StreamLeaseEvents}),
S1 = lists:foldl(
fun
(#{type := lease} = Event, S) -> accept_stream(Event, S);
(#{type := revoke} = Event, S) -> revoke_stream(Event, S)
end,
S0,
StreamLeaseEvents
),
SharedSubS1 = SharedSubS0#{agent => Agent1},
{S1, SharedSubS1}.
-spec on_streams_replayed(
emqx_persistent_session_ds_state:t(),
t()
) -> {emqx_persistent_session_ds_state:t(), t()}.
on_streams_replayed(S, #{agent := Agent0} = SharedSubS0) ->
%% TODO
%% Is it sufficient for a report?
Progress = fold_shared_stream_states(
fun(TopicFilter, Stream, SRS, Acc) ->
#srs{it_begin = BeginIt} = SRS,
StreamProgress = #{
topic_filter => TopicFilter,
stream => Stream,
iterator => BeginIt
},
[StreamProgress | Acc]
end,
[],
S
),
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_stream_progress(
Agent0, Progress
),
SharedSubS1 = SharedSubS0#{agent => Agent1},
{S, SharedSubS1}.
-spec on_info(emqx_persistent_session_ds_state:t(), t(), term()) ->
{emqx_persistent_session_ds_state:t(), t()}.
on_info(S, #{agent := Agent0} = SharedSubS0, Info) ->
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_info(Agent0, Info),
SharedSubS1 = SharedSubS0#{agent => Agent1},
{S, SharedSubS1}.
-spec to_map(emqx_persistent_session_ds_state:t(), t()) -> map().
to_map(_S, _SharedSubS) ->
%% TODO
#{}.
on_subscribe(#share{} = ShareTopicFilter, SubOpts, #{s := S} = Session) ->
Subscription = emqx_persistent_session_ds_state:get_subscription(ShareTopicFilter, S),
on_subscribe(Subscription, ShareTopicFilter, SubOpts, Session).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
%% on_subscribe internal functions
fold_shared_subs(Fun, Acc, S) ->
emqx_persistent_session_ds_state:fold_subscriptions(
fun
(#share{} = TopicFilter, Sub, Acc0) -> Fun(TopicFilter, Sub, Acc0);
(_, _Sub, Acc0) -> Acc0
end,
Acc,
S
).
fold_shared_stream_states(Fun, Acc, S) ->
%% TODO
%% Optimize or cache
TopicFilters = fold_shared_subs(
fun
(#share{} = TopicFilter, #{id := Id} = _Sub, Acc0) ->
Acc0#{Id => TopicFilter};
(_, _, Acc0) ->
Acc0
end,
#{},
S
),
emqx_persistent_session_ds_state:fold_streams(
fun({SubId, Stream}, SRS, Acc0) ->
case TopicFilters of
#{SubId := TopicFilter} ->
Fun(TopicFilter, Stream, SRS, Acc0);
_ ->
Acc0
end
end,
Acc,
S
).
on_subscribe(undefined, TopicFilter, SubOpts, #{props := Props, s := S} = Session) ->
on_subscribe(undefined, ShareTopicFilter, SubOpts, #{props := Props, s := S} = Session) ->
#{max_subscriptions := MaxSubscriptions} = Props,
case emqx_persistent_session_ds_state:n_subscriptions(S) < MaxSubscriptions of
true ->
create_new_subscription(TopicFilter, SubOpts, Session);
create_new_subscription(ShareTopicFilter, SubOpts, Session);
false ->
{error, ?RC_QUOTA_EXCEEDED}
end;
on_subscribe(Subscription, TopicFilter, SubOpts, Session) ->
update_subscription(Subscription, TopicFilter, SubOpts, Session).
on_subscribe(Subscription, ShareTopicFilter, SubOpts, Session) ->
update_subscription(Subscription, ShareTopicFilter, SubOpts, Session).
-dialyzer({nowarn_function, create_new_subscription/3}).
create_new_subscription(TopicFilter, SubOpts, #{
id := SessionId, s := S0, shared_sub_s := #{agent := Agent0} = SharedSubS0, props := Props
create_new_subscription(#share{topic = TopicFilter, group = Group} = ShareTopicFilter, SubOpts, #{
id := SessionId,
s := S0,
shared_sub_s := #{agent := Agent} = SharedSubS0,
props := Props
}) ->
case
emqx_persistent_session_ds_shared_subs_agent:on_subscribe(
Agent0, TopicFilter, SubOpts
emqx_persistent_session_ds_shared_subs_agent:can_subscribe(
Agent, ShareTopicFilter, SubOpts
)
of
{ok, Agent1} ->
ok ->
ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, #share_dest{
session_id = SessionId, group = Group
}),
_ = emqx_external_broker:add_persistent_shared_route(TopicFilter, Group, SessionId),
#{upgrade_qos := UpgradeQoS} = Props,
{SubId, S1} = emqx_persistent_session_ds_state:new_id(S0),
{SStateId, S2} = emqx_persistent_session_ds_state:new_id(S1),
@ -227,20 +199,20 @@ create_new_subscription(TopicFilter, SubOpts, #{
start_time => now_ms()
},
S = emqx_persistent_session_ds_state:put_subscription(
TopicFilter, Subscription, S3
ShareTopicFilter, Subscription, S3
),
SharedSubS = SharedSubS0#{agent => Agent1},
?tp(persistent_session_ds_shared_subscription_added, #{
topic_filter => TopicFilter, session => SessionId
}),
SharedSubS = schedule_subscribe(SharedSubS0, ShareTopicFilter, SubOpts),
{ok, S, SharedSubS};
{error, _} = Error ->
Error
end.
update_subscription(#{current_state := SStateId0, id := SubId} = Sub0, TopicFilter, SubOpts, #{
s := S0, shared_sub_s := SharedSubS, props := Props
}) ->
update_subscription(
#{current_state := SStateId0, id := SubId} = Sub0, ShareTopicFilter, SubOpts, #{
s := S0, shared_sub_s := SharedSubS, props := Props
}
) ->
#{upgrade_qos := UpgradeQoS} = Props,
SState = #{parent_subscription => SubId, upgrade_qos => UpgradeQoS, subopts => SubOpts},
case emqx_persistent_session_ds_state:get_subscription_state(SStateId0, S0) of
@ -254,36 +226,173 @@ update_subscription(#{current_state := SStateId0, id := SubId} = Sub0, TopicFilt
SStateId, SState, S1
),
Sub = Sub0#{current_state => SStateId},
S = emqx_persistent_session_ds_state:put_subscription(TopicFilter, Sub, S2),
S = emqx_persistent_session_ds_state:put_subscription(ShareTopicFilter, Sub, S2),
{ok, S, SharedSubS}
end.
lookup(TopicFilter, S) ->
case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S) of
Sub = #{current_state := SStateId} ->
case emqx_persistent_session_ds_state:get_subscription_state(SStateId, S) of
#{subopts := SubOpts} ->
Sub#{subopts => SubOpts};
undefined ->
undefined
end;
-dialyzer({nowarn_function, schedule_subscribe/3}).
schedule_subscribe(
#{agent := Agent0, scheduled_actions := ScheduledActions0} = SharedSubS0,
ShareTopicFilter,
SubOpts
) ->
case ScheduledActions0 of
#{ShareTopicFilter := ScheduledAction} ->
ScheduledActions1 = ScheduledActions0#{
ShareTopicFilter => ScheduledAction#{type => {?schedule_subscribe, SubOpts}}
},
?tp(debug, shared_subs_schedule_subscribe_override, #{
share_topic_filter => ShareTopicFilter,
new_type => {?schedule_subscribe, SubOpts},
old_action => format_schedule_action(ScheduledAction)
}),
SharedSubS0#{scheduled_actions := ScheduledActions1};
_ ->
?tp(debug, shared_subs_schedule_subscribe_new, #{
share_topic_filter => ShareTopicFilter, subopts => SubOpts
}),
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_subscribe(
Agent0, ShareTopicFilter, SubOpts
),
SharedSubS0#{agent => Agent1}
end.
%%--------------------------------------------------------------------
%% on_unsubscribe
-spec on_unsubscribe(
emqx_persistent_session_ds:id(),
share_topic_filter(),
emqx_persistent_session_ds_state:t(),
t()
) ->
{ok, emqx_persistent_session_ds_state:t(), t(), emqx_persistent_session_ds:subscription()}
| {error, emqx_types:reason_code()}.
on_unsubscribe(
SessionId, #share{topic = TopicFilter, group = Group} = ShareTopicFilter, S0, SharedSubS0
) ->
case lookup(ShareTopicFilter, S0) of
undefined ->
undefined
{error, ?RC_NO_SUBSCRIPTION_EXISTED};
#{id := SubId} = Subscription ->
?tp(persistent_session_ds_subscription_delete, #{
session_id => SessionId, share_topic_filter => ShareTopicFilter
}),
_ = emqx_external_broker:delete_persistent_shared_route(TopicFilter, Group, SessionId),
ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, #share_dest{
session_id = SessionId, group = Group
}),
S = emqx_persistent_session_ds_state:del_subscription(ShareTopicFilter, S0),
SharedSubS = schedule_unsubscribe(S, SharedSubS0, SubId, ShareTopicFilter),
{ok, S, SharedSubS, Subscription}
end.
%%--------------------------------------------------------------------
%% on_unsubscribe internal functions
schedule_unsubscribe(
S, #{scheduled_actions := ScheduledActions0} = SharedSubS0, UnsubscridedSubId, ShareTopicFilter
) ->
case ScheduledActions0 of
#{ShareTopicFilter := ScheduledAction0} ->
ScheduledAction1 = ScheduledAction0#{type => ?schedule_unsubscribe},
ScheduledActions1 = ScheduledActions0#{
ShareTopicFilter => ScheduledAction1
},
?tp(debug, shared_subs_schedule_unsubscribe_override, #{
share_topic_filter => ShareTopicFilter,
new_type => ?schedule_unsubscribe,
old_action => format_schedule_action(ScheduledAction0)
}),
SharedSubS0#{scheduled_actions := ScheduledActions1};
_ ->
StreamKeys = stream_keys_by_sub_id(S, UnsubscridedSubId),
ScheduledActions1 = ScheduledActions0#{
ShareTopicFilter => #{
type => ?schedule_unsubscribe,
stream_keys_to_wait => StreamKeys,
progresses => []
}
},
?tp(debug, shared_subs_schedule_unsubscribe_new, #{
share_topic_filter => ShareTopicFilter,
stream_keys => format_stream_keys(StreamKeys)
}),
SharedSubS0#{scheduled_actions := ScheduledActions1}
end.
%%--------------------------------------------------------------------
%% pre_renew_streams
-spec pre_renew_streams(emqx_persistent_session_ds_state:t(), t()) ->
{emqx_persistent_session_ds_state:t(), t()}.
pre_renew_streams(S, SharedSubS) ->
on_streams_replay(S, SharedSubS).
%%--------------------------------------------------------------------
%% renew_streams
-spec renew_streams(emqx_persistent_session_ds_state:t(), t()) ->
{emqx_persistent_session_ds_state:t(), t()}.
renew_streams(S0, #{agent := Agent0, scheduled_actions := ScheduledActions} = SharedSubS0) ->
{StreamLeaseEvents, Agent1} = emqx_persistent_session_ds_shared_subs_agent:renew_streams(
Agent0
),
StreamLeaseEvents =/= [] andalso
?tp(debug, shared_subs_new_stream_lease_events, #{
stream_lease_events => format_lease_events(StreamLeaseEvents)
}),
S1 = lists:foldl(
fun
(#{type := lease} = Event, S) -> accept_stream(Event, S, ScheduledActions);
(#{type := revoke} = Event, S) -> revoke_stream(Event, S)
end,
S0,
StreamLeaseEvents
),
SharedSubS1 = SharedSubS0#{agent => Agent1},
{S1, SharedSubS1}.
%%--------------------------------------------------------------------
%% renew_streams internal functions
accept_stream(#{share_topic_filter := ShareTopicFilter} = Event, S, ScheduledActions) ->
%% If we have a pending action (subscribe or unsubscribe) for this topic filter,
%% we should not accept a stream and start replaying it. We won't use it anyway:
%% * if subscribe is pending, we will reset agent obtain a new lease
%% * if unsubscribe is pending, we will drop connection
case ScheduledActions of
#{ShareTopicFilter := _Action} ->
S;
_ ->
accept_stream(Event, S)
end.
accept_stream(
#{topic_filter := TopicFilter, stream := Stream, iterator := Iterator}, S0
#{
share_topic_filter := ShareTopicFilter,
stream := Stream,
progress := #{iterator := Iterator} = _Progress
} = _Event,
S0
) ->
case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of
case emqx_persistent_session_ds_state:get_subscription(ShareTopicFilter, S0) of
undefined ->
%% This should not happen.
%% Agent should have received unsubscribe callback
%% and should not have passed this stream as a new one
error(new_stream_without_sub);
%% We unsubscribed
S0;
#{id := SubId, current_state := SStateId} ->
Key = {SubId, Stream},
case emqx_persistent_session_ds_state:get_stream(Key, S0) of
undefined ->
NeedCreateStream =
case emqx_persistent_session_ds_state:get_stream(Key, S0) of
undefined ->
true;
#srs{unsubscribed = true} ->
true;
_SRS ->
false
end,
case NeedCreateStream of
true ->
NewSRS =
#srs{
rank_x = ?rank_x,
@ -294,15 +403,15 @@ accept_stream(
},
S1 = emqx_persistent_session_ds_state:put_stream(Key, NewSRS, S0),
S1;
_SRS ->
false ->
S0
end
end.
revoke_stream(
#{topic_filter := TopicFilter, stream := Stream}, S0
#{share_topic_filter := ShareTopicFilter, stream := Stream}, S0
) ->
case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of
case emqx_persistent_session_ds_state:get_subscription(ShareTopicFilter, S0) of
undefined ->
%% This should not happen.
%% Agent should have received unsubscribe callback
@ -320,19 +429,363 @@ revoke_stream(
end
end.
-spec to_agent_subscription(
emqx_persistent_session_ds_state:t(), emqx_persistent_session_ds:subscription()
%%--------------------------------------------------------------------
%% on_streams_replay
-spec on_streams_replay(
emqx_persistent_session_ds_state:t(),
t()
) -> {emqx_persistent_session_ds_state:t(), t()}.
on_streams_replay(S0, SharedSubS0) ->
{S1, #{agent := Agent0, scheduled_actions := ScheduledActions0} = SharedSubS1} =
renew_streams(S0, SharedSubS0),
Progresses = all_stream_progresses(S1, Agent0),
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_stream_progress(
Agent0, Progresses
),
{Agent2, ScheduledActions1} = run_scheduled_actions(S1, Agent1, ScheduledActions0),
SharedSubS2 = SharedSubS1#{
agent => Agent2,
scheduled_actions => ScheduledActions1
},
{S1, SharedSubS2}.
%%--------------------------------------------------------------------
%% on_streams_replay internal functions
all_stream_progresses(S, Agent) ->
all_stream_progresses(S, Agent, _NeedUnacked = false).
all_stream_progresses(S, _Agent, NeedUnacked) ->
CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S),
CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S),
fold_shared_stream_states(
fun(ShareTopicFilter, Stream, SRS, ProgressesAcc0) ->
case
is_stream_started(CommQos1, CommQos2, SRS) and
(NeedUnacked or is_stream_fully_acked(CommQos1, CommQos2, SRS))
of
true ->
StreamProgress = stream_progress(CommQos1, CommQos2, Stream, SRS),
maps:update_with(
ShareTopicFilter,
fun(Progresses) -> [StreamProgress | Progresses] end,
[StreamProgress],
ProgressesAcc0
);
false ->
ProgressesAcc0
end
end,
#{},
S
).
run_scheduled_actions(S, Agent, ScheduledActions) ->
maps:fold(
fun(ShareTopicFilter, Action0, {AgentAcc0, ScheduledActionsAcc}) ->
case run_scheduled_action(S, AgentAcc0, ShareTopicFilter, Action0) of
{ok, AgentAcc1} ->
{AgentAcc1, maps:remove(ShareTopicFilter, ScheduledActionsAcc)};
{continue, Action1} ->
{AgentAcc0, ScheduledActionsAcc#{ShareTopicFilter => Action1}}
end
end,
{Agent, ScheduledActions},
ScheduledActions
).
run_scheduled_action(
S,
Agent0,
ShareTopicFilter,
#{type := Type, stream_keys_to_wait := StreamKeysToWait0, progresses := Progresses0} = Action
) ->
emqx_persistent_session_ds_shared_subs_agent:subscription().
to_agent_subscription(_S, Subscription) ->
StreamKeysToWait1 = filter_unfinished_streams(S, StreamKeysToWait0),
Progresses1 = stream_progresses(S, StreamKeysToWait0 -- StreamKeysToWait1) ++ Progresses0,
case StreamKeysToWait1 of
[] ->
?tp(debug, shared_subs_schedule_action_complete, #{
share_topic_filter => ShareTopicFilter,
progresses => format_stream_progresses(Progresses1),
type => Type
}),
%% Regular progress won't se unsubscribed streams, so we need to
%% send the progress explicitly.
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_stream_progress(
Agent0, #{ShareTopicFilter => Progresses1}
),
case Type of
{?schedule_subscribe, SubOpts} ->
{ok,
emqx_persistent_session_ds_shared_subs_agent:on_subscribe(
Agent1, ShareTopicFilter, SubOpts
)};
?schedule_unsubscribe ->
{ok,
emqx_persistent_session_ds_shared_subs_agent:on_unsubscribe(
Agent1, ShareTopicFilter, Progresses1
)}
end;
_ ->
Action1 = Action#{stream_keys_to_wait => StreamKeysToWait1, progresses => Progresses1},
?tp(debug, shared_subs_schedule_action_continue, #{
share_topic_filter => ShareTopicFilter,
new_action => format_schedule_action(Action1)
}),
{continue, Action1}
end.
filter_unfinished_streams(S, StreamKeysToWait) ->
CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S),
CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S),
lists:filter(
fun(Key) ->
case emqx_persistent_session_ds_state:get_stream(Key, S) of
undefined ->
%% This should not happen: we should see any stream
%% in completed state before deletion
true;
SRS ->
not is_stream_fully_acked(CommQos1, CommQos2, SRS)
end
end,
StreamKeysToWait
).
stream_progresses(S, StreamKeys) ->
CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S),
CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S),
lists:map(
fun({_SubId, Stream} = Key) ->
SRS = emqx_persistent_session_ds_state:get_stream(Key, S),
stream_progress(CommQos1, CommQos2, Stream, SRS)
end,
StreamKeys
).
%%--------------------------------------------------------------------
%% on_disconnect
on_disconnect(S0, #{agent := Agent0} = SharedSubS0) ->
S1 = revoke_all_streams(S0),
Progresses = all_stream_progresses(S1, Agent0, _NeedUnacked = true),
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_disconnect(Agent0, Progresses),
SharedSubS1 = SharedSubS0#{agent => Agent1, scheduled_actions => #{}},
{S1, SharedSubS1}.
%%--------------------------------------------------------------------
%% on_disconnect helpers
revoke_all_streams(S0) ->
fold_shared_stream_states(
fun(ShareTopicFilter, Stream, _SRS, S) ->
revoke_stream(#{share_topic_filter => ShareTopicFilter, stream => Stream}, S)
end,
S0,
S0
).
%%--------------------------------------------------------------------
%% on_info
-spec on_info(emqx_persistent_session_ds_state:t(), t(), term()) ->
{emqx_persistent_session_ds_state:t(), t()}.
on_info(S, #{agent := Agent0} = SharedSubS0, Info) ->
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_info(Agent0, Info),
SharedSubS1 = SharedSubS0#{agent => Agent1},
{S, SharedSubS1}.
%%--------------------------------------------------------------------
%% to_map
-spec to_map(emqx_persistent_session_ds_state:t(), t()) -> map().
to_map(S, _SharedSubS) ->
fold_shared_subs(
fun(ShareTopicFilter, _, Acc) -> Acc#{ShareTopicFilter => lookup(ShareTopicFilter, S)} end,
#{},
S
).
%%--------------------------------------------------------------------
%% cold_get_subscription
-spec cold_get_subscription(emqx_persistent_session_ds:id(), share_topic_filter()) ->
emqx_persistent_session_ds:subscription() | undefined.
cold_get_subscription(SessionId, ShareTopicFilter) ->
case emqx_persistent_session_ds_state:cold_get_subscription(SessionId, ShareTopicFilter) of
[Sub = #{current_state := SStateId}] ->
case
emqx_persistent_session_ds_state:cold_get_subscription_state(SessionId, SStateId)
of
[#{subopts := Subopts}] ->
Sub#{subopts => Subopts};
_ ->
undefined
end;
_ ->
undefined
end.
%%--------------------------------------------------------------------
%% Generic helpers
%%--------------------------------------------------------------------
lookup(ShareTopicFilter, S) ->
case emqx_persistent_session_ds_state:get_subscription(ShareTopicFilter, S) of
Sub = #{current_state := SStateId} ->
case emqx_persistent_session_ds_state:get_subscription_state(SStateId, S) of
#{subopts := SubOpts} ->
Sub#{subopts => SubOpts};
undefined ->
undefined
end;
undefined ->
undefined
end.
stream_keys_by_sub_id(S, MatchSubId) ->
emqx_persistent_session_ds_state:fold_streams(
fun({SubId, _Stream} = StreamKey, _SRS, StreamKeys) ->
case SubId of
MatchSubId ->
[StreamKey | StreamKeys];
_ ->
StreamKeys
end
end,
[],
S
).
stream_progress(
CommQos1,
CommQos2,
Stream,
#srs{
it_end = EndIt,
it_begin = BeginIt
} = SRS
) ->
Iterator =
case is_stream_fully_acked(CommQos1, CommQos2, SRS) of
true -> EndIt;
false -> BeginIt
end,
#{
stream => Stream,
progress => #{
iterator => Iterator
},
use_finished => is_use_finished(SRS)
}.
fold_shared_subs(Fun, Acc, S) ->
emqx_persistent_session_ds_state:fold_subscriptions(
fun
(#share{} = ShareTopicFilter, Sub, Acc0) -> Fun(ShareTopicFilter, Sub, Acc0);
(_, _Sub, Acc0) -> Acc0
end,
Acc,
S
).
fold_shared_stream_states(Fun, Acc, S) ->
%% TODO
%% do we need anything from sub state?
%% Optimize or cache
ShareTopicFilters = fold_shared_subs(
fun
(#share{} = ShareTopicFilter, #{id := Id} = _Sub, Acc0) ->
Acc0#{Id => ShareTopicFilter};
(_, _, Acc0) ->
Acc0
end,
#{},
S
),
emqx_persistent_session_ds_state:fold_streams(
fun({SubId, Stream}, SRS, Acc0) ->
case ShareTopicFilters of
#{SubId := ShareTopicFilter} ->
Fun(ShareTopicFilter, Stream, SRS, Acc0);
_ ->
Acc0
end
end,
Acc,
S
).
to_agent_subscription(_S, Subscription) ->
maps:with([start_time], Subscription).
-spec agent_opts(opts()) -> emqx_persistent_session_ds_shared_subs_agent:opts().
agent_opts(#{session_id := SessionId}) ->
#{session_id => SessionId}.
-dialyzer({nowarn_function, now_ms/0}).
now_ms() ->
erlang:system_time(millisecond).
is_use_finished(#srs{unsubscribed = Unsubscribed}) ->
Unsubscribed.
is_stream_started(CommQos1, CommQos2, #srs{first_seqno_qos1 = Q1, last_seqno_qos1 = Q2}) ->
(CommQos1 >= Q1) or (CommQos2 >= Q2).
is_stream_fully_acked(_, _, #srs{
first_seqno_qos1 = Q1, last_seqno_qos1 = Q1, first_seqno_qos2 = Q2, last_seqno_qos2 = Q2
}) ->
%% Streams where the last chunk doesn't contain any QoS1 and 2
%% messages are considered fully acked:
true;
is_stream_fully_acked(Comm1, Comm2, #srs{last_seqno_qos1 = S1, last_seqno_qos2 = S2}) ->
(Comm1 >= S1) andalso (Comm2 >= S2).
%%--------------------------------------------------------------------
%% Formatters
%%--------------------------------------------------------------------
format_schedule_action(#{
type := Type, progresses := Progresses, stream_keys_to_wait := StreamKeysToWait
}) ->
#{
type => Type,
progresses => format_stream_progresses(Progresses),
stream_keys_to_wait => format_stream_keys(StreamKeysToWait)
}.
format_stream_progresses(Streams) ->
lists:map(
fun format_stream_progress/1,
Streams
).
format_stream_progress(#{stream := Stream, progress := Progress} = Value) ->
Value#{stream => format_opaque(Stream), progress => format_progress(Progress)}.
format_progress(#{iterator := Iterator} = Progress) ->
Progress#{iterator => format_opaque(Iterator)}.
format_stream_key(beginning) -> beginning;
format_stream_key({SubId, Stream}) -> {SubId, format_opaque(Stream)}.
format_stream_keys(StreamKeys) ->
lists:map(
fun format_stream_key/1,
StreamKeys
).
format_lease_events(Events) ->
lists:map(
fun format_lease_event/1,
Events
).
format_lease_event(#{stream := Stream, progress := Progress} = Event) ->
Event#{stream => format_opaque(Stream), progress => format_progress(Progress)};
format_lease_event(#{stream := Stream} = Event) ->
Event#{stream => format_opaque(Stream)}.
format_opaque(Opaque) ->
erlang:phash2(Opaque).

View File

@ -15,7 +15,7 @@
}.
-type t() :: term().
-type topic_filter() :: emqx_persistent_session_ds:share_topic_filter().
-type share_topic_filter() :: emqx_persistent_session_ds:share_topic_filter().
-type opts() :: #{
session_id := session_id()
@ -28,41 +28,44 @@
-type stream_lease() :: #{
type => lease,
%% Used as "external" subscription_id
topic_filter := topic_filter(),
share_topic_filter := share_topic_filter(),
stream := emqx_ds:stream(),
iterator := emqx_ds:iterator()
}.
-type stream_revoke() :: #{
type => revoke,
topic_filter := topic_filter(),
share_topic_filter := share_topic_filter(),
stream := emqx_ds:stream()
}.
-type stream_lease_event() :: stream_lease() | stream_revoke().
-type stream_progress() :: #{
topic_filter := topic_filter(),
share_topic_filter := share_topic_filter(),
stream := emqx_ds:stream(),
iterator := emqx_ds:iterator()
iterator := emqx_ds:iterator(),
use_finished := boolean()
}.
-export_type([
t/0,
subscription/0,
session_id/0,
stream_lease/0,
stream_lease_event/0,
opts/0
]).
-export([
new/1,
open/2,
can_subscribe/3,
on_subscribe/3,
on_unsubscribe/2,
on_unsubscribe/3,
on_stream_progress/2,
on_info/2,
on_disconnect/2,
renew_streams/1
]).
@ -77,12 +80,13 @@
%%--------------------------------------------------------------------
-callback new(opts()) -> t().
-callback open([{topic_filter(), subscription()}], opts()) -> t().
-callback on_subscribe(t(), topic_filter(), emqx_types:subopts()) ->
{ok, t()} | {error, term()}.
-callback on_unsubscribe(t(), topic_filter()) -> t().
-callback open([{share_topic_filter(), subscription()}], opts()) -> t().
-callback can_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> ok | {error, term()}.
-callback on_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> t().
-callback on_unsubscribe(t(), share_topic_filter(), [stream_progress()]) -> t().
-callback on_disconnect(t(), [stream_progress()]) -> t().
-callback renew_streams(t()) -> {[stream_lease_event()], t()}.
-callback on_stream_progress(t(), [stream_progress()]) -> t().
-callback on_stream_progress(t(), #{share_topic_filter() => [stream_progress()]}) -> t().
-callback on_info(t(), term()) -> t().
%%--------------------------------------------------------------------
@ -93,24 +97,31 @@
new(Opts) ->
?shared_subs_agent:new(Opts).
-spec open([{topic_filter(), subscription()}], opts()) -> t().
-spec open([{share_topic_filter(), subscription()}], opts()) -> t().
open(Topics, Opts) ->
?shared_subs_agent:open(Topics, Opts).
-spec on_subscribe(t(), topic_filter(), emqx_types:subopts()) ->
{ok, t()} | {error, emqx_types:reason_code()}.
on_subscribe(Agent, TopicFilter, SubOpts) ->
?shared_subs_agent:on_subscribe(Agent, TopicFilter, SubOpts).
-spec can_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> ok | {error, term()}.
can_subscribe(Agent, ShareTopicFilter, SubOpts) ->
?shared_subs_agent:can_subscribe(Agent, ShareTopicFilter, SubOpts).
-spec on_unsubscribe(t(), topic_filter()) -> t().
on_unsubscribe(Agent, TopicFilter) ->
?shared_subs_agent:on_unsubscribe(Agent, TopicFilter).
-spec on_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> t().
on_subscribe(Agent, ShareTopicFilter, SubOpts) ->
?shared_subs_agent:on_subscribe(Agent, ShareTopicFilter, SubOpts).
-spec on_unsubscribe(t(), share_topic_filter(), [stream_progress()]) -> t().
on_unsubscribe(Agent, ShareTopicFilter, StreamProgresses) ->
?shared_subs_agent:on_unsubscribe(Agent, ShareTopicFilter, StreamProgresses).
-spec on_disconnect(t(), #{share_topic_filter() => [stream_progress()]}) -> t().
on_disconnect(Agent, StreamProgresses) ->
?shared_subs_agent:on_disconnect(Agent, StreamProgresses).
-spec renew_streams(t()) -> {[stream_lease_event()], t()}.
renew_streams(Agent) ->
?shared_subs_agent:renew_streams(Agent).
-spec on_stream_progress(t(), [stream_progress()]) -> t().
-spec on_stream_progress(t(), #{share_topic_filter() => [stream_progress()]}) -> t().
on_stream_progress(Agent, StreamProgress) ->
?shared_subs_agent:on_stream_progress(Agent, StreamProgress).

View File

@ -9,11 +9,13 @@
-export([
new/1,
open/2,
can_subscribe/3,
on_subscribe/3,
on_unsubscribe/2,
on_unsubscribe/3,
on_stream_progress/2,
on_info/2,
on_disconnect/2,
renew_streams/1
]).
@ -30,10 +32,16 @@ new(_Opts) ->
open(_Topics, _Opts) ->
undefined.
on_subscribe(_Agent, _TopicFilter, _SubOpts) ->
can_subscribe(_Agent, _TopicFilter, _SubOpts) ->
{error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}.
on_unsubscribe(Agent, _TopicFilter) ->
on_subscribe(Agent, _TopicFilter, _SubOpts) ->
Agent.
on_unsubscribe(Agent, _TopicFilter, _Progresses) ->
Agent.
on_disconnect(Agent, _) ->
Agent.
renew_streams(Agent) ->

View File

@ -399,7 +399,9 @@ new_id(Rec) ->
get_subscription(TopicFilter, Rec) ->
gen_get(?subscriptions, TopicFilter, Rec).
-spec cold_get_subscription(emqx_persistent_session_ds:id(), emqx_types:topic()) ->
-spec cold_get_subscription(
emqx_persistent_session_ds:id(), emqx_types:topic() | emqx_types:share()
) ->
[emqx_persistent_session_ds_subs:subscription()].
cold_get_subscription(SessionId, Topic) ->
kv_pmap_read(?subscription_tab, SessionId, Topic).

View File

@ -21,7 +21,7 @@
-record(ps_route, {
topic :: binary(),
dest :: emqx_persistent_session_ds:id() | '_'
dest :: emqx_persistent_session_ds_router:dest() | '_'
}).
-record(ps_routeidx, {

View File

@ -21,6 +21,7 @@
%% Till full implementation we need to dispach to the null agent.
%% It will report "not implemented" error for attempts to use shared subscriptions.
-define(shared_subs_agent, emqx_persistent_session_ds_shared_subs_null_agent).
% -define(shared_subs_agent, emqx_ds_shared_sub_agent).
%% end of -ifdef(TEST).
-endif.

View File

@ -62,7 +62,7 @@
streams := [{pid(), quicer:stream_handle()}],
%% New stream opts
stream_opts := map(),
%% If conneciton is resumed from session ticket
%% If connection is resumed from session ticket
is_resumed => boolean(),
%% mqtt message serializer config
serialize => undefined,
@ -70,8 +70,8 @@
}.
-type cb_ret() :: quicer_lib:cb_ret().
%% @doc Data streams initializions are started in parallel with control streams, data streams are blocked
%% for the activation from control stream after it is accepted as a legit conneciton.
%% @doc Data streams initializations are started in parallel with control streams, data streams are blocked
%% for the activation from control stream after it is accepted as a legit connection.
%% For security, the initial number of allowed data streams from client should be limited by
%% 'peer_bidi_stream_count` & 'peer_unidi_stream_count`
-spec activate_data_streams(pid(), {
@ -80,7 +80,7 @@
activate_data_streams(ConnOwner, {PS, Serialize, Channel}) ->
gen_server:call(ConnOwner, {activate_data_streams, {PS, Serialize, Channel}}, infinity).
%% @doc conneciton owner init callback
%% @doc connection owner init callback
-spec init(map()) -> {ok, cb_state()}.
init(#{stream_opts := SOpts} = S) when is_list(SOpts) ->
init(S#{stream_opts := maps:from_list(SOpts)});

View File

@ -589,6 +589,14 @@ ensure_valid_options(Options, Versions) ->
ensure_valid_options([], _, Acc) ->
lists:reverse(Acc);
ensure_valid_options([{K, undefined} | T], Versions, Acc) when
K =:= crl_check;
K =:= crl_cache
->
%% Note: we must set crl options to `undefined' to unset them. Otherwise,
%% `esockd' will retain such options when `esockd:merge_opts/2' is called and the SSL
%% options were previously enabled.
ensure_valid_options(T, Versions, [{K, undefined} | Acc]);
ensure_valid_options([{_, undefined} | T], Versions, Acc) ->
ensure_valid_options(T, Versions, Acc);
ensure_valid_options([{_, ""} | T], Versions, Acc) ->

View File

@ -17,7 +17,6 @@
-include("emqx_mqtt.hrl").
-export([format/2]).
-export([format_meta_map/1]).
%% logger_formatter:config/0 is not exported.
-type config() :: map().
@ -43,10 +42,6 @@ format(
format(Event, Config) ->
emqx_logger_textfmt:format(Event, Config).
format_meta_map(Meta) ->
Encode = emqx_trace_handler:payload_encode(),
format_meta_map(Meta, Encode).
format_meta_map(Meta, Encode) ->
format_meta_map(Meta, Encode, [
{packet, fun format_packet/2},

View File

@ -436,6 +436,7 @@ websocket_handle({Frame, _}, State) ->
%% TODO: should not close the ws connection
?LOG(error, #{msg => "unexpected_frame", frame => Frame}),
shutdown(unexpected_ws_frame, State).
websocket_info({call, From, Req}, State) ->
handle_call(From, Req, State);
websocket_info({cast, rate_limit}, State) ->
@ -737,7 +738,8 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
input_bytes => Data
}),
FrameError = {frame_error, Reason},
{[{incoming, FrameError} | Packets], State};
NState = enrich_state(Reason, State),
{[{incoming, FrameError} | Packets], NState};
error:Reason:Stacktrace ->
?LOG(error, #{
at_state => emqx_frame:describe_state(ParseState),
@ -830,7 +832,7 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
?LOG(warning, #{
msg => "packet_discarded",
reason => "frame_too_large",
packet => emqx_packet:format(Packet)
packet => Packet
}),
ok = emqx_metrics:inc('delivery.dropped.too_large'),
ok = emqx_metrics:inc('delivery.dropped'),
@ -1069,6 +1071,13 @@ check_max_connection(Type, Listener) ->
{denny, Reason}
end
end.
enrich_state(#{parse_state := NParseState}, State) ->
Serialize = emqx_frame:serialize_opts(NParseState),
State#state{parse_state = NParseState, serialize = Serialize};
enrich_state(_, State) ->
State.
%%--------------------------------------------------------------------
%% For CT tests
%%--------------------------------------------------------------------

View File

@ -414,24 +414,32 @@ t_handle_in_auth(_) ->
emqx_channel:handle_in(?AUTH_PACKET(), Channel).
t_handle_in_frame_error(_) ->
IdleChannel = channel(#{conn_state => idle}),
{shutdown, #{shutdown_count := frame_too_large, cause := frame_too_large}, _Chan} =
emqx_channel:handle_in({frame_error, #{cause => frame_too_large}}, IdleChannel),
IdleChannelV5 = channel(#{conn_state => idle}),
%% no CONNACK packet for v4
?assertMatch(
{shutdown, #{shutdown_count := frame_too_large, cause := frame_too_large}, _Chan},
emqx_channel:handle_in(
{frame_error, #{cause => frame_too_large}}, v4(IdleChannelV5)
)
),
ConnectingChan = channel(#{conn_state => connecting}),
ConnackPacket = ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE),
{shutdown,
#{
shutdown_count := frame_too_large,
cause := frame_too_large,
limit := 100,
received := 101
},
ConnackPacket,
_} =
?assertMatch(
{shutdown,
#{
shutdown_count := frame_too_large,
cause := frame_too_large,
limit := 100,
received := 101
},
ConnackPacket, _},
emqx_channel:handle_in(
{frame_error, #{cause => frame_too_large, received => 101, limit => 100}},
ConnectingChan
),
)
),
DisconnectPacket = ?DISCONNECT_PACKET(?RC_PACKET_TOO_LARGE),
ConnectedChan = channel(#{conn_state => connected}),
?assertMatch(

View File

@ -138,13 +138,14 @@ init_per_testcase(t_refresh_config = TestCase, Config) ->
];
init_per_testcase(TestCase, Config) when
TestCase =:= t_update_listener;
TestCase =:= t_update_listener_enable_disable;
TestCase =:= t_validations
->
ct:timetrap({seconds, 30}),
ok = snabbkaffe:start_trace(),
%% when running emqx standalone tests, we can't use those
%% features.
case does_module_exist(emqx_management) of
case does_module_exist(emqx_mgmt) of
true ->
DataDir = ?config(data_dir, Config),
CRLFile = filename:join([DataDir, "intermediate-revoked.crl.pem"]),
@ -165,7 +166,7 @@ init_per_testcase(TestCase, Config) when
{emqx_conf, #{config => #{listeners => #{ssl => #{default => ListenerConf}}}}},
emqx,
emqx_management,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
emqx_mgmt_api_test_util:emqx_dashboard()
],
#{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}
),
@ -206,6 +207,7 @@ read_crl(Filename) ->
end_per_testcase(TestCase, Config) when
TestCase =:= t_update_listener;
TestCase =:= t_update_listener_enable_disable;
TestCase =:= t_validations
->
Skip = proplists:get_bool(skip_does_not_apply, Config),
@ -1057,3 +1059,104 @@ do_t_validations(_Config) ->
),
ok.
%% Checks that if CRL is ever enabled and then disabled, clients can connect, even if they
%% would otherwise not have their corresponding CRLs cached and fail with `{bad_crls,
%% no_relevant_crls}`.
t_update_listener_enable_disable(Config) ->
case proplists:get_bool(skip_does_not_apply, Config) of
true ->
ct:pal("skipping as this test does not apply in this profile"),
ok;
false ->
do_t_update_listener_enable_disable(Config)
end.
do_t_update_listener_enable_disable(Config) ->
DataDir = ?config(data_dir, Config),
Keyfile = filename:join([DataDir, "server.key.pem"]),
Certfile = filename:join([DataDir, "server.cert.pem"]),
Cacertfile = filename:join([DataDir, "ca-chain.cert.pem"]),
ClientCert = filename:join(DataDir, "client.cert.pem"),
ClientKey = filename:join(DataDir, "client.key.pem"),
ListenerId = "ssl:default",
%% Enable CRL
{ok, {{_, 200, _}, _, ListenerData0}} = get_listener_via_api(ListenerId),
CRLConfig0 =
#{
<<"ssl_options">> =>
#{
<<"keyfile">> => Keyfile,
<<"certfile">> => Certfile,
<<"cacertfile">> => Cacertfile,
<<"enable_crl_check">> => true,
<<"fail_if_no_peer_cert">> => true
}
},
ListenerData1 = emqx_utils_maps:deep_merge(ListenerData0, CRLConfig0),
{ok, {_, _, ListenerData2}} = update_listener_via_api(ListenerId, ListenerData1),
?assertMatch(
#{
<<"ssl_options">> :=
#{
<<"enable_crl_check">> := true,
<<"verify">> := <<"verify_peer">>,
<<"fail_if_no_peer_cert">> := true
}
},
ListenerData2
),
%% Disable CRL
CRLConfig1 =
#{
<<"ssl_options">> =>
#{
<<"keyfile">> => Keyfile,
<<"certfile">> => Certfile,
<<"cacertfile">> => Cacertfile,
<<"enable_crl_check">> => false,
<<"fail_if_no_peer_cert">> => true
}
},
ListenerData3 = emqx_utils_maps:deep_merge(ListenerData2, CRLConfig1),
redbug:start(
[
"esockd_server:get_listener_prop -> return",
"esockd_server:set_listener_prop -> return",
"esockd:merge_opts -> return",
"esockd_listener_sup:set_options -> return",
"emqx_listeners:inject_crl_config -> return"
],
[{msgs, 100}]
),
{ok, {_, _, ListenerData4}} = update_listener_via_api(ListenerId, ListenerData3),
?assertMatch(
#{
<<"ssl_options">> :=
#{
<<"enable_crl_check">> := false,
<<"verify">> := <<"verify_peer">>,
<<"fail_if_no_peer_cert">> := true
}
},
ListenerData4
),
%% Now the client that would be blocked tries to connect and should now be allowed.
{ok, C} = emqtt:start_link([
{ssl, true},
{ssl_opts, [
{certfile, ClientCert},
{keyfile, ClientKey},
{verify, verify_none}
]},
{port, 8883}
]),
?assertMatch({ok, _}, emqtt:connect(C)),
emqtt:stop(C),
?assertNotReceive({http_get, _}),
ok.

View File

@ -391,6 +391,8 @@ default_appspec(emqx_schema_validation, _SuiteOpts) ->
#{schema_mod => emqx_schema_validation_schema, config => #{}};
default_appspec(emqx_message_transformation, _SuiteOpts) ->
#{schema_mod => emqx_message_transformation_schema, config => #{}};
default_appspec(emqx_ds_shared_sub, _SuiteOpts) ->
#{schema_mod => emqx_ds_shared_sub_schema, config => #{}};
default_appspec(_, _) ->
#{}.

View File

@ -56,6 +56,8 @@ t_exclusive_sub(_) ->
{ok, _} = emqtt:connect(C1),
?CHECK_SUB(C1, 0),
?CHECK_SUB(C1, 0),
{ok, C2} = emqtt:start_link([
{clientid, <<"client2">>},
{clean_start, false},

View File

@ -63,6 +63,7 @@ groups() ->
t_parse_malformed_properties,
t_malformed_connect_header,
t_malformed_connect_data,
t_malformed_connect_data_proto_ver,
t_reserved_connect_flag,
t_invalid_clientid,
t_undefined_password,
@ -167,6 +168,8 @@ t_parse_malformed_utf8_string(_) ->
ParseState = emqx_frame:initial_parse_state(#{strict_mode => true}),
?ASSERT_FRAME_THROW(utf8_string_invalid, emqx_frame:parse(MalformedPacket, ParseState)).
%% TODO: parse v3 with 0 length clientid
t_serialize_parse_v3_connect(_) ->
Bin =
<<16, 37, 0, 6, 77, 81, 73, 115, 100, 112, 3, 2, 0, 60, 0, 23, 109, 111, 115, 113, 112, 117,
@ -324,7 +327,7 @@ t_serialize_parse_bridge_connect(_) ->
header = #mqtt_packet_header{type = ?CONNECT},
variable = #mqtt_packet_connect{
clientid = <<"C_00:0C:29:2B:77:52">>,
proto_ver = 16#03,
proto_ver = ?MQTT_PROTO_V3,
proto_name = <<"MQIsdp">>,
is_bridge = true,
will_retain = true,
@ -686,15 +689,36 @@ t_malformed_connect_header(_) ->
).
t_malformed_connect_data(_) ->
ProtoNameWithLen = <<0, 6, "MQIsdp">>,
ConnectFlags = <<2#00000000>>,
ClientIdwithLen = <<0, 1, "a">>,
UnexpectedRestBin = <<0, 1, 2>>,
?ASSERT_FRAME_THROW(
#{cause := malformed_connect, unexpected_trailing_bytes := _},
emqx_frame:parse(<<16, 15, 0, 6, 77, 81, 73, 115, 100, 112, 3, 0, 0, 0, 0, 0, 0>>)
#{cause := malformed_connect, unexpected_trailing_bytes := 3},
emqx_frame:parse(
<<16, 18, ProtoNameWithLen/binary, ?MQTT_PROTO_V3, ConnectFlags/binary, 0, 0,
ClientIdwithLen/binary, UnexpectedRestBin/binary>>
)
).
t_malformed_connect_data_proto_ver(_) ->
Proto3NameWithLen = <<0, 6, "MQIsdp">>,
?ASSERT_FRAME_THROW(
#{cause := malformed_connect, header_bytes := <<>>},
emqx_frame:parse(<<16, 8, Proto3NameWithLen/binary>>)
),
ProtoNameWithLen = <<0, 4, "MQTT">>,
?ASSERT_FRAME_THROW(
#{cause := malformed_connect, header_bytes := <<>>},
emqx_frame:parse(<<16, 6, ProtoNameWithLen/binary>>)
).
t_reserved_connect_flag(_) ->
?assertException(
throw,
{frame_parse_error, reserved_connect_flag},
{frame_parse_error, #{
cause := reserved_connect_flag, proto_ver := ?MQTT_PROTO_V3, proto_name := <<"MQIsdp">>
}},
emqx_frame:parse(<<16, 15, 0, 6, 77, 81, 73, 115, 100, 112, 3, 1, 0, 0, 1, 0, 0>>)
).
@ -726,7 +750,7 @@ t_undefined_password(_) ->
},
variable = #mqtt_packet_connect{
proto_name = <<"MQTT">>,
proto_ver = 4,
proto_ver = ?MQTT_PROTO_V4,
is_bridge = false,
clean_start = true,
will_flag = false,
@ -774,7 +798,9 @@ t_invalid_will_retain(_) ->
54, 75, 78, 112, 57, 0, 6, 68, 103, 55, 87, 87, 87>>,
?assertException(
throw,
{frame_parse_error, invalid_will_retain},
{frame_parse_error, #{
cause := invalid_will_retain, proto_ver := ?MQTT_PROTO_V5, proto_name := <<"MQTT">>
}},
emqx_frame:parse(ConnectBin)
),
ok.
@ -796,22 +822,30 @@ t_invalid_will_qos(_) ->
),
?assertException(
throw,
{frame_parse_error, invalid_will_qos},
{frame_parse_error, #{
cause := invalid_will_qos, proto_ver := ?MQTT_PROTO_V5, proto_name := <<"MQTT">>
}},
emqx_frame:parse(ConnectBinFun(Will_F_WillQoS1))
),
?assertException(
throw,
{frame_parse_error, invalid_will_qos},
{frame_parse_error, #{
cause := invalid_will_qos, proto_ver := ?MQTT_PROTO_V5, proto_name := <<"MQTT">>
}},
emqx_frame:parse(ConnectBinFun(Will_F_WillQoS2))
),
?assertException(
throw,
{frame_parse_error, invalid_will_qos},
{frame_parse_error, #{
cause := invalid_will_qos, proto_ver := ?MQTT_PROTO_V5, proto_name := <<"MQTT">>
}},
emqx_frame:parse(ConnectBinFun(Will_F_WillQoS3))
),
?assertException(
throw,
{frame_parse_error, invalid_will_qos},
{frame_parse_error, #{
cause := invalid_will_qos, proto_ver := ?MQTT_PROTO_V5, proto_name := <<"MQTT">>
}},
emqx_frame:parse(ConnectBinFun(Will_T_WillQoS3))
),
ok.

View File

@ -26,6 +26,7 @@
%% Have to use real msgs, as the schema is guarded by enum.
-define(THROTTLE_MSG, authorization_permission_denied).
-define(THROTTLE_MSG1, cannot_publish_to_topic_due_to_not_authorized).
-define(THROTTLE_UNRECOVERABLE_MSG, unrecoverable_resource_error).
-define(TIME_WINDOW, <<"1s">>).
all() -> emqx_common_test_helpers:all(?MODULE).
@ -59,6 +60,11 @@ end_per_suite(Config) ->
emqx_cth_suite:stop(?config(suite_apps, Config)),
emqx_config:delete_override_conf_files().
init_per_testcase(t_throttle_recoverable_msg, Config) ->
ok = snabbkaffe:start_trace(),
[?THROTTLE_MSG] = Conf = emqx:get_config([log, throttling, msgs]),
{ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_UNRECOVERABLE_MSG | Conf], #{}),
Config;
init_per_testcase(t_throttle_add_new_msg, Config) ->
ok = snabbkaffe:start_trace(),
[?THROTTLE_MSG] = Conf = emqx:get_config([log, throttling, msgs]),
@ -72,6 +78,10 @@ init_per_testcase(_TC, Config) ->
ok = snabbkaffe:start_trace(),
Config.
end_per_testcase(t_throttle_recoverable_msg, _Config) ->
ok = snabbkaffe:stop(),
{ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_MSG], #{}),
ok;
end_per_testcase(t_throttle_add_new_msg, _Config) ->
ok = snabbkaffe:stop(),
{ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_MSG], #{}),
@ -101,8 +111,8 @@ t_throttle(_Config) ->
5000
),
?assert(emqx_log_throttler:allow(?THROTTLE_MSG)),
?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG)),
?assert(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)),
?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)),
{ok, _} = ?block_until(
#{
?snk_kind := log_throttler_dropped,
@ -115,14 +125,48 @@ t_throttle(_Config) ->
[]
).
t_throttle_recoverable_msg(_Config) ->
ResourceId = <<"resource_id">>,
ThrottledMsg = iolist_to_binary([atom_to_list(?THROTTLE_UNRECOVERABLE_MSG), ":", ResourceId]),
?check_trace(
begin
%% Warm-up and block to increase the probability that next events
%% will be in the same throttling time window.
{ok, _} = ?block_until(
#{?snk_kind := log_throttler_new_msg, throttled_msg := ?THROTTLE_UNRECOVERABLE_MSG},
5000
),
{_, {ok, _}} = ?wait_async_action(
events(?THROTTLE_UNRECOVERABLE_MSG, ResourceId),
#{
?snk_kind := log_throttler_dropped,
throttled_msg := ThrottledMsg
},
5000
),
?assert(emqx_log_throttler:allow(?THROTTLE_UNRECOVERABLE_MSG, ResourceId)),
?assertNot(emqx_log_throttler:allow(?THROTTLE_UNRECOVERABLE_MSG, ResourceId)),
{ok, _} = ?block_until(
#{
?snk_kind := log_throttler_dropped,
throttled_msg := ThrottledMsg,
dropped_count := 1
},
3000
)
end,
[]
).
t_throttle_add_new_msg(_Config) ->
?check_trace(
begin
{ok, _} = ?block_until(
#{?snk_kind := log_throttler_new_msg, throttled_msg := ?THROTTLE_MSG1}, 5000
),
?assert(emqx_log_throttler:allow(?THROTTLE_MSG1)),
?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG1)),
?assert(emqx_log_throttler:allow(?THROTTLE_MSG1, undefined)),
?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG1, undefined)),
{ok, _} = ?block_until(
#{
?snk_kind := log_throttler_dropped,
@ -137,10 +181,15 @@ t_throttle_add_new_msg(_Config) ->
t_throttle_no_msg(_Config) ->
%% Must simply pass with no crashes
?assert(emqx_log_throttler:allow(no_test_throttle_msg)),
?assert(emqx_log_throttler:allow(no_test_throttle_msg)),
timer:sleep(10),
?assert(erlang:is_process_alive(erlang:whereis(emqx_log_throttler))).
Pid = erlang:whereis(emqx_log_throttler),
?assert(emqx_log_throttler:allow(no_test_throttle_msg, undefined)),
?assert(emqx_log_throttler:allow(no_test_throttle_msg, undefined)),
%% assert process is not restarted
?assertEqual(Pid, erlang:whereis(emqx_log_throttler)),
%% make a gen_call to ensure the process is alive
%% note: this call result in an 'unexpected_call' error log.
?assertEqual(ignored, gen_server:call(Pid, probe)),
ok.
t_update_time_window(_Config) ->
?check_trace(
@ -168,8 +217,8 @@ t_throttle_debug_primary_level(_Config) ->
#{?snk_kind := log_throttler_dropped, throttled_msg := ?THROTTLE_MSG},
5000
),
?assert(emqx_log_throttler:allow(?THROTTLE_MSG)),
?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG)),
?assert(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)),
?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)),
{ok, _} = ?block_until(
#{
?snk_kind := log_throttler_dropped,
@ -187,10 +236,13 @@ t_throttle_debug_primary_level(_Config) ->
%%--------------------------------------------------------------------
events(Msg) ->
events(100, Msg).
events(100, Msg, undefined).
events(N, Msg) ->
[emqx_log_throttler:allow(Msg) || _ <- lists:seq(1, N)].
events(Msg, Id) ->
events(100, Msg, Id).
events(N, Msg, Id) ->
[emqx_log_throttler:allow(Msg, Id) || _ <- lists:seq(1, N)].
module_exists(Mod) ->
case erlang:module_loaded(Mod) of

View File

@ -377,42 +377,60 @@ t_will_msg(_) ->
t_format(_) ->
io:format("~ts", [
emqx_packet:format(#mqtt_packet{
header = #mqtt_packet_header{type = ?CONNACK, retain = true, dup = 0},
variable = undefined
})
]),
io:format("~ts", [
emqx_packet:format(#mqtt_packet{
header = #mqtt_packet_header{type = ?CONNACK}, variable = 1, payload = <<"payload">>
})
emqx_packet:format(
#mqtt_packet{
header = #mqtt_packet_header{type = ?CONNACK, retain = true, dup = 0},
variable = undefined
},
text
)
]),
io:format(
"~ts",
[
emqx_packet:format(
#mqtt_packet{
header = #mqtt_packet_header{type = ?CONNACK},
variable = 1,
payload = <<"payload">>
},
text
)
]
),
io:format("~ts", [
emqx_packet:format(
?CONNECT_PACKET(#mqtt_packet_connect{
will_flag = true,
will_retain = true,
will_qos = ?QOS_2,
will_topic = <<"topic">>,
will_payload = <<"payload">>
})
?CONNECT_PACKET(
#mqtt_packet_connect{
will_flag = true,
will_retain = true,
will_qos = ?QOS_2,
will_topic = <<"topic">>,
will_payload = <<"payload">>
}
),
text
)
]),
io:format("~ts", [
emqx_packet:format(?CONNECT_PACKET(#mqtt_packet_connect{password = password}))
emqx_packet:format(?CONNECT_PACKET(#mqtt_packet_connect{password = password}), text)
]),
io:format("~ts", [emqx_packet:format(?CONNACK_PACKET(?CONNACK_SERVER))]),
io:format("~ts", [emqx_packet:format(?PUBLISH_PACKET(?QOS_1, 1))]),
io:format("~ts", [emqx_packet:format(?PUBLISH_PACKET(?QOS_2, <<"topic">>, 10, <<"payload">>))]),
io:format("~ts", [emqx_packet:format(?PUBACK_PACKET(?PUBACK, 98))]),
io:format("~ts", [emqx_packet:format(?PUBREL_PACKET(99))]),
io:format("~ts", [emqx_packet:format(?CONNACK_PACKET(?CONNACK_SERVER), text)]),
io:format("~ts", [emqx_packet:format(?PUBLISH_PACKET(?QOS_1, 1), text)]),
io:format("~ts", [
emqx_packet:format(?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS_0}, {<<"topic1">>, ?QOS_1}]))
emqx_packet:format(?PUBLISH_PACKET(?QOS_2, <<"topic">>, 10, <<"payload">>), text)
]),
io:format("~ts", [emqx_packet:format(?SUBACK_PACKET(40, [?QOS_0, ?QOS_1]))]),
io:format("~ts", [emqx_packet:format(?UNSUBSCRIBE_PACKET(89, [<<"t">>, <<"t2">>]))]),
io:format("~ts", [emqx_packet:format(?UNSUBACK_PACKET(90))]),
io:format("~ts", [emqx_packet:format(?DISCONNECT_PACKET(128))]).
io:format("~ts", [emqx_packet:format(?PUBACK_PACKET(?PUBACK, 98), text)]),
io:format("~ts", [emqx_packet:format(?PUBREL_PACKET(99), text)]),
io:format("~ts", [
emqx_packet:format(
?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS_0}, {<<"topic1">>, ?QOS_1}]), text
)
]),
io:format("~ts", [emqx_packet:format(?SUBACK_PACKET(40, [?QOS_0, ?QOS_1]), text)]),
io:format("~ts", [emqx_packet:format(?UNSUBSCRIBE_PACKET(89, [<<"t">>, <<"t2">>]), text)]),
io:format("~ts", [emqx_packet:format(?UNSUBACK_PACKET(90), text)]),
io:format("~ts", [emqx_packet:format(?DISCONNECT_PACKET(128), text)]).
t_parse_empty_publish(_) ->
%% 52: 0011(type=PUBLISH) 0100 (QoS=2)

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_auth, [
{description, "EMQX Authentication and authorization"},
{vsn, "0.3.3"},
{vsn, "0.3.4"},
{modules, []},
{registered, [emqx_auth_sup]},
{applications, [

View File

@ -477,9 +477,15 @@ authorize_deny(
sources()
) ->
authz_result().
authorize(Client, PubSub, Topic, _DefaultResult, Sources) ->
authorize(#{username := Username} = Client, PubSub, Topic, _DefaultResult, Sources) ->
case maps:get(is_superuser, Client, false) of
true ->
?tp(authz_skipped, #{reason => client_is_superuser, action => PubSub}),
?TRACE("AUTHZ", "authorization_skipped_as_superuser", #{
username => Username,
topic => Topic,
action => emqx_access_control:format_action(PubSub)
}),
emqx_metrics:inc(?METRIC_SUPERUSER),
{stop, #{result => allow, from => superuser}};
false ->

View File

@ -674,5 +674,77 @@ t_publish_last_will_testament_banned_client_connecting(_Config) ->
ok.
t_sikpped_as_superuser(_Config) ->
ClientInfo = #{
clientid => <<"clientid">>,
username => <<"username">>,
peerhost => {127, 0, 0, 1},
zone => default,
listener => {tcp, default},
is_superuser => true
},
?check_trace(
begin
?assertEqual(
allow,
emqx_access_control:authorize(ClientInfo, ?AUTHZ_PUBLISH(?QOS_0), <<"p/t/0">>)
),
?assertEqual(
allow,
emqx_access_control:authorize(ClientInfo, ?AUTHZ_PUBLISH(?QOS_1), <<"p/t/1">>)
),
?assertEqual(
allow,
emqx_access_control:authorize(ClientInfo, ?AUTHZ_PUBLISH(?QOS_2), <<"p/t/2">>)
),
?assertEqual(
allow,
emqx_access_control:authorize(ClientInfo, ?AUTHZ_SUBSCRIBE(?QOS_0), <<"s/t/0">>)
),
?assertEqual(
allow,
emqx_access_control:authorize(ClientInfo, ?AUTHZ_SUBSCRIBE(?QOS_1), <<"s/t/1">>)
),
?assertEqual(
allow,
emqx_access_control:authorize(ClientInfo, ?AUTHZ_SUBSCRIBE(?QOS_2), <<"s/t/2">>)
)
end,
fun(Trace) ->
?assertMatch(
[
#{
reason := client_is_superuser,
action := #{qos := ?QOS_0, action_type := publish}
},
#{
reason := client_is_superuser,
action := #{qos := ?QOS_1, action_type := publish}
},
#{
reason := client_is_superuser,
action := #{qos := ?QOS_2, action_type := publish}
},
#{
reason := client_is_superuser,
action := #{qos := ?QOS_0, action_type := subscribe}
},
#{
reason := client_is_superuser,
action := #{qos := ?QOS_1, action_type := subscribe}
},
#{
reason := client_is_superuser,
action := #{qos := ?QOS_2, action_type := subscribe}
}
],
?of_kind(authz_skipped, Trace)
),
ok
end
),
ok = snabbkaffe:stop().
stop_apps(Apps) ->
lists:foreach(fun application:stop/1, Apps).

View File

@ -31,4 +31,6 @@
-define(AUTHN_TYPE, {?AUTHN_MECHANISM, ?AUTHN_BACKEND}).
-define(AUTHN_TYPE_SCRAM, {?AUTHN_MECHANISM_SCRAM, ?AUTHN_BACKEND}).
-define(AUTHN_DATA_FIELDS, [is_superuser, client_attrs, expire_at, acl]).
-endif.

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_auth_http, [
{description, "EMQX External HTTP API Authentication and Authorization"},
{vsn, "0.3.0"},
{vsn, "0.3.1"},
{registered, []},
{mod, {emqx_auth_http_app, []}},
{applications, [

View File

@ -25,7 +25,7 @@
start(_StartType, _StartArgs) ->
ok = emqx_authz:register_source(?AUTHZ_TYPE, emqx_authz_http),
ok = emqx_authn:register_provider(?AUTHN_TYPE, emqx_authn_http),
ok = emqx_authn:register_provider(?AUTHN_TYPE_SCRAM, emqx_authn_scram_http),
ok = emqx_authn:register_provider(?AUTHN_TYPE_SCRAM, emqx_authn_scram_restapi),
{ok, Sup} = emqx_auth_http_sup:start_link(),
{ok, Sup}.

View File

@ -32,7 +32,9 @@
with_validated_config/2,
generate_request/2,
request_for_log/2,
response_for_log/1
response_for_log/1,
extract_auth_data/2,
safely_parse_body/2
]).
-define(DEFAULT_CONTENT_TYPE, <<"application/json">>).
@ -194,34 +196,14 @@ handle_response(Headers, Body) ->
case safely_parse_body(ContentType, Body) of
{ok, NBody} ->
body_to_auth_data(NBody);
{error, Reason} ->
?TRACE_AUTHN_PROVIDER(
error,
"parse_http_response_failed",
#{content_type => ContentType, body => Body, reason => Reason}
),
{error, _Reason} ->
ignore
end.
body_to_auth_data(Body) ->
case maps:get(<<"result">>, Body, <<"ignore">>) of
<<"allow">> ->
IsSuperuser = emqx_authn_utils:is_superuser(Body),
Attrs = emqx_authn_utils:client_attrs(Body),
try
ExpireAt = expire_at(Body),
ACL = acl(ExpireAt, Body),
Result = merge_maps([ExpireAt, IsSuperuser, ACL, Attrs]),
{ok, Result}
catch
throw:{bad_acl_rule, Reason} ->
%% it's a invalid token, so ok to log
?TRACE_AUTHN_PROVIDER("bad_acl_rule", Reason#{http_body => Body}),
{error, bad_username_or_password};
throw:Reason ->
?TRACE_AUTHN_PROVIDER("bad_response_body", Reason#{http_body => Body}),
{error, bad_username_or_password}
end;
extract_auth_data(http, Body);
<<"deny">> ->
{error, not_authorized};
<<"ignore">> ->
@ -230,6 +212,24 @@ body_to_auth_data(Body) ->
ignore
end.
extract_auth_data(Source, Body) ->
IsSuperuser = emqx_authn_utils:is_superuser(Body),
Attrs = emqx_authn_utils:client_attrs(Body),
try
ExpireAt = expire_at(Body),
ACL = acl(ExpireAt, Source, Body),
Result = merge_maps([ExpireAt, IsSuperuser, ACL, Attrs]),
{ok, Result}
catch
throw:{bad_acl_rule, Reason} ->
%% it's a invalid token, so ok to log
?TRACE_AUTHN_PROVIDER("bad_acl_rule", Reason#{http_body => Body}),
{error, bad_username_or_password};
throw:Reason ->
?TRACE_AUTHN_PROVIDER("bad_response_body", Reason#{http_body => Body}),
{error, bad_username_or_password}
end.
merge_maps([]) -> #{};
merge_maps([Map | Maps]) -> maps:merge(Map, merge_maps(Maps)).
@ -268,40 +268,43 @@ expire_sec(#{<<"expire_at">> := _}) ->
expire_sec(_) ->
undefined.
acl(#{expire_at := ExpireTimeMs}, #{<<"acl">> := Rules}) ->
acl(#{expire_at := ExpireTimeMs}, Source, #{<<"acl">> := Rules}) ->
#{
acl => #{
source_for_logging => http,
source_for_logging => Source,
rules => emqx_authz_rule_raw:parse_and_compile_rules(Rules),
%% It's seconds level precision (like JWT) for authz
%% see emqx_authz_client_info:check/1
expire => erlang:convert_time_unit(ExpireTimeMs, millisecond, second)
}
};
acl(_NoExpire, #{<<"acl">> := Rules}) ->
acl(_NoExpire, Source, #{<<"acl">> := Rules}) ->
#{
acl => #{
source_for_logging => http,
source_for_logging => Source,
rules => emqx_authz_rule_raw:parse_and_compile_rules(Rules)
}
};
acl(_, _) ->
acl(_, _, _) ->
#{}.
safely_parse_body(ContentType, Body) ->
try
parse_body(ContentType, Body)
catch
_Class:_Reason ->
_Class:Reason ->
?TRACE_AUTHN_PROVIDER(
error,
"parse_http_response_failed",
#{content_type => ContentType, body => Body, reason => Reason}
),
{error, invalid_body}
end.
parse_body(<<"application/json", _/binary>>, Body) ->
{ok, emqx_utils_json:decode(Body, [return_maps])};
parse_body(<<"application/x-www-form-urlencoded", _/binary>>, Body) ->
Flags = [<<"result">>, <<"is_superuser">>],
RawMap = maps:from_list(cow_qs:parse_qs(Body)),
NBody = maps:with(Flags, RawMap),
NBody = maps:from_list(cow_qs:parse_qs(Body)),
{ok, NBody};
parse_body(ContentType, _) ->
{error, {unsupported_content_type, ContentType}}.

View File

@ -2,10 +2,19 @@
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_authn_scram_http).
%% Note:
%% This is not an implementation of the RFC 7804:
%% Salted Challenge Response HTTP Authentication Mechanism.
%% This backend is an implementation of scram,
%% which uses an external web resource as a source of user information.
-include_lib("emqx_auth/include/emqx_authn.hrl").
-module(emqx_authn_scram_restapi).
-feature(maybe_expr, enable).
-include("emqx_auth_http.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx_auth/include/emqx_authn.hrl").
-behaviour(emqx_authn_provider).
@ -22,10 +31,6 @@
<<"salt">>
]).
-define(OPTIONAL_USER_INFO_KEYS, [
<<"is_superuser">>
]).
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
@ -72,7 +77,9 @@ authenticate(
reason => Reason
})
end,
emqx_utils_scram:authenticate(AuthMethod, AuthData, AuthCache, RetrieveFun, OnErrFun, State);
emqx_utils_scram:authenticate(
AuthMethod, AuthData, AuthCache, State, RetrieveFun, OnErrFun, ?AUTHN_DATA_FIELDS
);
authenticate(_Credential, _State) ->
ignore.
@ -95,7 +102,7 @@ retrieve(
) ->
Request = emqx_authn_http:generate_request(Credential#{username := Username}, State),
Response = emqx_resource:simple_sync_query(ResourceId, {Method, Request, RequestTimeout}),
?TRACE_AUTHN_PROVIDER("scram_http_response", #{
?TRACE_AUTHN_PROVIDER("scram_restapi_response", #{
request => emqx_authn_http:request_for_log(Credential, State),
response => emqx_authn_http:response_for_log(Response),
resource => ResourceId
@ -113,16 +120,11 @@ retrieve(
handle_response(Headers, Body) ->
ContentType = proplists:get_value(<<"content-type">>, Headers),
case safely_parse_body(ContentType, Body) of
{ok, NBody} ->
body_to_user_info(NBody);
{error, Reason} = Error ->
?TRACE_AUTHN_PROVIDER(
error,
"parse_scram_http_response_failed",
#{content_type => ContentType, body => Body, reason => Reason}
),
Error
maybe
{ok, NBody} ?= emqx_authn_http:safely_parse_body(ContentType, Body),
{ok, UserInfo} ?= body_to_user_info(NBody),
{ok, AuthData} ?= emqx_authn_http:extract_auth_data(scram_restapi, NBody),
{ok, maps:merge(AuthData, UserInfo)}
end.
body_to_user_info(Body) ->
@ -131,26 +133,16 @@ body_to_user_info(Body) ->
true ->
case safely_convert_hex(Required0) of
{ok, Required} ->
UserInfo0 = maps:merge(Required, maps:with(?OPTIONAL_USER_INFO_KEYS, Body)),
UserInfo1 = emqx_utils_maps:safe_atom_key_map(UserInfo0),
UserInfo = maps:merge(#{is_superuser => false}, UserInfo1),
{ok, UserInfo};
{ok, emqx_utils_maps:safe_atom_key_map(Required)};
Error ->
?TRACE_AUTHN_PROVIDER("decode_keys_failed", #{http_body => Body}),
Error
end;
_ ->
?TRACE_AUTHN_PROVIDER("bad_response_body", #{http_body => Body}),
?TRACE_AUTHN_PROVIDER("missing_requried_keys", #{http_body => Body}),
{error, bad_response}
end.
safely_parse_body(ContentType, Body) ->
try
parse_body(ContentType, Body)
catch
_Class:_Reason ->
{error, invalid_body}
end.
safely_convert_hex(Required) ->
try
{ok,
@ -165,15 +157,5 @@ safely_convert_hex(Required) ->
{error, Reason}
end.
parse_body(<<"application/json", _/binary>>, Body) ->
{ok, emqx_utils_json:decode(Body, [return_maps])};
parse_body(<<"application/x-www-form-urlencoded", _/binary>>, Body) ->
Flags = ?REQUIRED_USER_INFO_KEYS ++ ?OPTIONAL_USER_INFO_KEYS,
RawMap = maps:from_list(cow_qs:parse_qs(Body)),
NBody = maps:with(Flags, RawMap),
{ok, NBody};
parse_body(ContentType, _) ->
{error, {unsupported_content_type, ContentType}}.
merge_scram_conf(Conf, State) ->
maps:merge(maps:with([algorithm, iteration_count], Conf), State).

View File

@ -2,7 +2,7 @@
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_authn_scram_http_schema).
-module(emqx_authn_scram_restapi_schema).
-behaviour(emqx_authn_schema).
@ -22,16 +22,16 @@
namespace() -> "authn".
refs() ->
[?R_REF(scram_http_get), ?R_REF(scram_http_post)].
[?R_REF(scram_restapi_get), ?R_REF(scram_restapi_post)].
select_union_member(
#{<<"mechanism">> := ?AUTHN_MECHANISM_SCRAM_BIN, <<"backend">> := ?AUTHN_BACKEND_BIN} = Value
) ->
case maps:get(<<"method">>, Value, undefined) of
<<"get">> ->
[?R_REF(scram_http_get)];
[?R_REF(scram_restapi_get)];
<<"post">> ->
[?R_REF(scramm_http_post)];
[?R_REF(scram_restapi_post)];
Else ->
throw(#{
reason => "unknown_http_method",
@ -43,20 +43,20 @@ select_union_member(
select_union_member(_Value) ->
undefined.
fields(scram_http_get) ->
fields(scram_restapi_get) ->
[
{method, #{type => get, required => true, desc => ?DESC(emqx_authn_http_schema, method)}},
{headers, fun emqx_authn_http_schema:headers_no_content_type/1}
] ++ common_fields();
fields(scram_http_post) ->
fields(scram_restapi_post) ->
[
{method, #{type => post, required => true, desc => ?DESC(emqx_authn_http_schema, method)}},
{headers, fun emqx_authn_http_schema:headers/1}
] ++ common_fields().
desc(scram_http_get) ->
desc(scram_restapi_get) ->
?DESC(emqx_authn_http_schema, get);
desc(scram_http_post) ->
desc(scram_restapi_post) ->
?DESC(emqx_authn_http_schema, post);
desc(_) ->
undefined.

View File

@ -2,7 +2,7 @@
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_authn_scram_http_SUITE).
-module(emqx_authn_scram_restapi_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
@ -21,6 +21,9 @@
-define(ALGORITHM_STR, <<"sha512">>).
-define(ITERATION_COUNT, 4096).
-define(T_ACL_USERNAME, <<"username">>).
-define(T_ACL_PASSWORD, <<"password">>).
-include_lib("emqx/include/emqx_placeholder.hrl").
all() ->
@ -54,11 +57,11 @@ init_per_testcase(_Case, Config) ->
[authentication],
?GLOBAL
),
{ok, _} = emqx_authn_scram_http_test_server:start_link(?HTTP_PORT, ?HTTP_PATH),
{ok, _} = emqx_authn_scram_restapi_test_server:start_link(?HTTP_PORT, ?HTTP_PATH),
Config.
end_per_testcase(_Case, _Config) ->
ok = emqx_authn_scram_http_test_server:stop().
ok = emqx_authn_scram_restapi_test_server:stop().
%%------------------------------------------------------------------------------
%% Tests
@ -72,7 +75,9 @@ t_create(_Config) ->
{create_authenticator, ?GLOBAL, AuthConfig}
),
{ok, [#{provider := emqx_authn_scram_http}]} = emqx_authn_chains:list_authenticators(?GLOBAL).
{ok, [#{provider := emqx_authn_scram_restapi}]} = emqx_authn_chains:list_authenticators(
?GLOBAL
).
t_create_invalid(_Config) ->
AuthConfig = raw_config(),
@ -118,59 +123,8 @@ t_authenticate(_Config) ->
ok = emqx_config:put([mqtt, idle_timeout], 500),
{ok, Pid} = emqx_authn_mqtt_test_client:start_link("127.0.0.1", 1883),
ClientFirstMessage = esasl_scram:client_first_message(Username),
ConnectPacket = ?CONNECT_PACKET(
#mqtt_packet_connect{
proto_ver = ?MQTT_PROTO_V5,
properties = #{
'Authentication-Method' => <<"SCRAM-SHA-512">>,
'Authentication-Data' => ClientFirstMessage
}
}
),
ok = emqx_authn_mqtt_test_client:send(Pid, ConnectPacket),
%% Intentional sleep to trigger idle timeout for the connection not yet authenticated
ok = ct:sleep(1000),
?AUTH_PACKET(
?RC_CONTINUE_AUTHENTICATION,
#{'Authentication-Data' := ServerFirstMessage}
) = receive_packet(),
{continue, ClientFinalMessage, ClientCache} =
esasl_scram:check_server_first_message(
ServerFirstMessage,
#{
client_first_message => ClientFirstMessage,
password => Password,
algorithm => ?ALGORITHM
}
),
AuthContinuePacket = ?AUTH_PACKET(
?RC_CONTINUE_AUTHENTICATION,
#{
'Authentication-Method' => <<"SCRAM-SHA-512">>,
'Authentication-Data' => ClientFinalMessage
}
),
ok = emqx_authn_mqtt_test_client:send(Pid, AuthContinuePacket),
?CONNACK_PACKET(
?RC_SUCCESS,
_,
#{'Authentication-Data' := ServerFinalMessage}
) = receive_packet(),
ok = esasl_scram:check_server_final_message(
ServerFinalMessage, ClientCache#{algorithm => ?ALGORITHM}
).
{ok, Pid} = create_connection(Username, Password),
emqx_authn_mqtt_test_client:stop(Pid).
t_authenticate_bad_props(_Config) ->
Username = <<"u">>,
@ -314,6 +268,47 @@ t_destroy(_Config) ->
_
) = receive_packet().
t_acl(_Config) ->
init_auth(),
ACL = emqx_authn_http_SUITE:acl_rules(),
set_user_handler(?T_ACL_USERNAME, ?T_ACL_PASSWORD, #{acl => ACL}),
{ok, Pid} = create_connection(?T_ACL_USERNAME, ?T_ACL_PASSWORD),
Cases = [
{allow, <<"http-authn-acl/#">>},
{deny, <<"http-authn-acl/1">>},
{deny, <<"t/#">>}
],
try
lists:foreach(
fun(Case) ->
test_acl(Case, Pid)
end,
Cases
)
after
ok = emqx_authn_mqtt_test_client:stop(Pid)
end.
t_auth_expire(_Config) ->
init_auth(),
ExpireSec = 3,
WaitTime = timer:seconds(ExpireSec + 1),
ACL = emqx_authn_http_SUITE:acl_rules(),
set_user_handler(?T_ACL_USERNAME, ?T_ACL_PASSWORD, #{
acl => ACL,
expire_at =>
erlang:system_time(second) + ExpireSec
}),
{ok, Pid} = create_connection(?T_ACL_USERNAME, ?T_ACL_PASSWORD),
timer:sleep(WaitTime),
?assertEqual(false, erlang:is_process_alive(Pid)).
t_is_superuser() ->
State = init_auth(),
ok = test_is_superuser(State, false),
@ -324,12 +319,12 @@ test_is_superuser(State, ExpectedIsSuperuser) ->
Username = <<"u">>,
Password = <<"p">>,
set_user_handler(Username, Password, ExpectedIsSuperuser),
set_user_handler(Username, Password, #{is_superuser => ExpectedIsSuperuser}),
ClientFirstMessage = esasl_scram:client_first_message(Username),
{continue, ServerFirstMessage, ServerCache} =
emqx_authn_scram_http:authenticate(
emqx_authn_scram_restapi:authenticate(
#{
auth_method => <<"SCRAM-SHA-512">>,
auth_data => ClientFirstMessage,
@ -349,7 +344,7 @@ test_is_superuser(State, ExpectedIsSuperuser) ->
),
{ok, UserInfo1, ServerFinalMessage} =
emqx_authn_scram_http:authenticate(
emqx_authn_scram_restapi:authenticate(
#{
auth_method => <<"SCRAM-SHA-512">>,
auth_data => ClientFinalMessage,
@ -382,24 +377,25 @@ raw_config() ->
}.
set_user_handler(Username, Password) ->
set_user_handler(Username, Password, false).
set_user_handler(Username, Password, IsSuperuser) ->
set_user_handler(Username, Password, #{is_superuser => false}).
set_user_handler(Username, Password, Extra0) ->
%% HTTP Server
Handler = fun(Req0, State) ->
#{
username := Username
} = cowboy_req:match_qs([username], Req0),
UserInfo = make_user_info(Password, ?ALGORITHM, ?ITERATION_COUNT, IsSuperuser),
UserInfo = make_user_info(Password, ?ALGORITHM, ?ITERATION_COUNT),
Extra = maps:merge(#{is_superuser => false}, Extra0),
Req = cowboy_req:reply(
200,
#{<<"content-type">> => <<"application/json">>},
emqx_utils_json:encode(UserInfo),
emqx_utils_json:encode(maps:merge(Extra, UserInfo)),
Req0
),
{ok, Req, State}
end,
ok = emqx_authn_scram_http_test_server:set_handler(Handler).
ok = emqx_authn_scram_restapi_test_server:set_handler(Handler).
init_auth() ->
init_auth(raw_config()).
@ -413,7 +409,7 @@ init_auth(Config) ->
{ok, [#{state := State}]} = emqx_authn_chains:list_authenticators(?GLOBAL),
State.
make_user_info(Password, Algorithm, IterationCount, IsSuperuser) ->
make_user_info(Password, Algorithm, IterationCount) ->
{StoredKey, ServerKey, Salt} = esasl_scram:generate_authentication_info(
Password,
#{
@ -424,8 +420,7 @@ make_user_info(Password, Algorithm, IterationCount, IsSuperuser) ->
#{
stored_key => binary:encode_hex(StoredKey),
server_key => binary:encode_hex(ServerKey),
salt => binary:encode_hex(Salt),
is_superuser => IsSuperuser
salt => binary:encode_hex(Salt)
}.
receive_packet() ->
@ -436,3 +431,79 @@ receive_packet() ->
after 1000 ->
ct:fail("Deliver timeout")
end.
create_connection(Username, Password) ->
{ok, Pid} = emqx_authn_mqtt_test_client:start_link("127.0.0.1", 1883),
ClientFirstMessage = esasl_scram:client_first_message(Username),
ConnectPacket = ?CONNECT_PACKET(
#mqtt_packet_connect{
proto_ver = ?MQTT_PROTO_V5,
properties = #{
'Authentication-Method' => <<"SCRAM-SHA-512">>,
'Authentication-Data' => ClientFirstMessage
}
}
),
ok = emqx_authn_mqtt_test_client:send(Pid, ConnectPacket),
%% Intentional sleep to trigger idle timeout for the connection not yet authenticated
ok = ct:sleep(1000),
?AUTH_PACKET(
?RC_CONTINUE_AUTHENTICATION,
#{'Authentication-Data' := ServerFirstMessage}
) = receive_packet(),
{continue, ClientFinalMessage, ClientCache} =
esasl_scram:check_server_first_message(
ServerFirstMessage,
#{
client_first_message => ClientFirstMessage,
password => Password,
algorithm => ?ALGORITHM
}
),
AuthContinuePacket = ?AUTH_PACKET(
?RC_CONTINUE_AUTHENTICATION,
#{
'Authentication-Method' => <<"SCRAM-SHA-512">>,
'Authentication-Data' => ClientFinalMessage
}
),
ok = emqx_authn_mqtt_test_client:send(Pid, AuthContinuePacket),
?CONNACK_PACKET(
?RC_SUCCESS,
_,
#{'Authentication-Data' := ServerFinalMessage}
) = receive_packet(),
ok = esasl_scram:check_server_final_message(
ServerFinalMessage, ClientCache#{algorithm => ?ALGORITHM}
),
{ok, Pid}.
test_acl({allow, Topic}, C) ->
?assertMatch(
[0],
send_subscribe(C, Topic)
);
test_acl({deny, Topic}, C) ->
?assertMatch(
[?RC_NOT_AUTHORIZED],
send_subscribe(C, Topic)
).
send_subscribe(Client, Topic) ->
TopicOpts = #{nl => 0, rap => 0, rh => 0, qos => 0},
Packet = ?SUBSCRIBE_PACKET(1, [{Topic, TopicOpts}]),
emqx_authn_mqtt_test_client:send(Client, Packet),
timer:sleep(200),
?SUBACK_PACKET(1, ReasonCode) = receive_packet(),
ReasonCode.

View File

@ -2,7 +2,7 @@
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_authn_scram_http_test_server).
-module(emqx_authn_scram_restapi_test_server).
-behaviour(supervisor).
-behaviour(cowboy_handler).

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_auth_jwt, [
{description, "EMQX JWT Authentication and Authorization"},
{vsn, "0.3.2"},
{vsn, "0.3.3"},
{registered, []},
{mod, {emqx_auth_jwt_app, []}},
{applications, [

View File

@ -21,6 +21,7 @@
-include_lib("emqx_auth/include/emqx_authn.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(LDAP_HOST, "ldap").
-define(LDAP_DEFAULT_PORT, 389).
@ -46,13 +47,6 @@ init_per_suite(Config) ->
Apps = emqx_cth_suite:start([emqx, emqx_conf, emqx_auth, emqx_auth_ldap], #{
work_dir => ?config(priv_dir, Config)
}),
{ok, _} = emqx_resource:create_local(
?LDAP_RESOURCE,
?AUTHN_RESOURCE_GROUP,
emqx_ldap,
ldap_config(),
#{}
),
[{apps, Apps} | Config];
false ->
{skip, no_ldap}
@ -63,7 +57,6 @@ end_per_suite(Config) ->
[authentication],
?GLOBAL
),
ok = emqx_resource:remove_local(?LDAP_RESOURCE),
ok = emqx_cth_suite:stop(?config(apps, Config)).
%%------------------------------------------------------------------------------
@ -128,6 +121,87 @@ t_create_invalid(_Config) ->
InvalidConfigs
).
t_authenticate_timeout_cause_reconnect(_Config) ->
TestPid = self(),
meck:new(eldap, [non_strict, no_link, passthrough]),
try
%% cause eldap process to be killed
meck:expect(
eldap,
search,
fun
(Pid, [{base, <<"uid=mqttuser0007", _/binary>>} | _]) ->
TestPid ! {eldap_pid, Pid},
{error, {gen_tcp_error, timeout}};
(Pid, Args) ->
meck:passthrough([Pid, Args])
end
),
Credentials = fun(Username) ->
#{
username => Username,
password => Username,
listener => 'tcp:default',
protocol => mqtt
}
end,
SpecificConfigParams = #{},
Result = {ok, #{is_superuser => true}},
Timeout = 1000,
Config0 = raw_ldap_auth_config(),
Config = Config0#{
<<"pool_size">> => 1,
<<"request_timeout">> => Timeout
},
AuthConfig = maps:merge(Config, SpecificConfigParams),
{ok, _} = emqx:update_config(
?PATH,
{create_authenticator, ?GLOBAL, AuthConfig}
),
%% 0006 is a disabled user
?assertEqual(
{error, user_disabled},
emqx_access_control:authenticate(Credentials(<<"mqttuser0006">>))
),
?assertEqual(
{error, not_authorized},
emqx_access_control:authenticate(Credentials(<<"mqttuser0007">>))
),
ok = wait_for_ldap_pid(1000),
[#{id := ResourceID}] = emqx_resource_manager:list_all(),
?retry(1_000, 10, {ok, connected} = emqx_resource_manager:health_check(ResourceID)),
%% turn back to normal
meck:expect(
eldap,
search,
2,
fun(Pid2, Query) ->
meck:passthrough([Pid2, Query])
end
),
%% expect eldap process to be restarted
?assertEqual(Result, emqx_access_control:authenticate(Credentials(<<"mqttuser0007">>))),
emqx_authn_test_lib:delete_authenticators(
[authentication],
?GLOBAL
)
after
meck:unload(eldap)
end.
wait_for_ldap_pid(After) ->
receive
{eldap_pid, Pid} ->
?assertNot(is_process_alive(Pid)),
ok
after After ->
error(timeout)
end.
t_authenticate(_Config) ->
ok = lists:foreach(
fun(Sample) ->
@ -300,6 +374,3 @@ user_seeds() ->
ldap_server() ->
iolist_to_binary(io_lib:format("~s:~B", [?LDAP_HOST, ?LDAP_DEFAULT_PORT])).
ldap_config() ->
emqx_ldap_SUITE:ldap_config([]).

View File

@ -44,7 +44,6 @@ init_per_suite(Config) ->
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
ok = create_ldap_resource(),
[{apps, Apps} | Config];
false ->
{skip, no_ldap}
@ -167,21 +166,8 @@ setup_config(SpecialParams) ->
ldap_server() ->
iolist_to_binary(io_lib:format("~s:~B", [?LDAP_HOST, ?LDAP_DEFAULT_PORT])).
ldap_config() ->
emqx_ldap_SUITE:ldap_config([]).
start_apps(Apps) ->
lists:foreach(fun application:ensure_all_started/1, Apps).
stop_apps(Apps) ->
lists:foreach(fun application:stop/1, Apps).
create_ldap_resource() ->
{ok, _} = emqx_resource:create_local(
?LDAP_RESOURCE,
?AUTHZ_RESOURCE_GROUP,
emqx_ldap,
ldap_config(),
#{}
),
ok.

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_auth_mnesia, [
{description, "EMQX Buitl-in Database Authentication and Authorization"},
{vsn, "0.1.6"},
{vsn, "0.1.7"},
{registered, []},
{mod, {emqx_auth_mnesia_app, []}},
{applications, [

View File

@ -141,7 +141,9 @@ authenticate(
reason => Reason
})
end,
emqx_utils_scram:authenticate(AuthMethod, AuthData, AuthCache, RetrieveFun, OnErrFun, State);
emqx_utils_scram:authenticate(
AuthMethod, AuthData, AuthCache, State, RetrieveFun, OnErrFun, [is_superuser]
);
authenticate(_Credential, _State) ->
ignore.

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_auth_mongodb, [
{description, "EMQX MongoDB Authentication and Authorization"},
{vsn, "0.2.1"},
{vsn, "0.2.2"},
{registered, []},
{mod, {emqx_auth_mongodb_app, []}},
{applications, [

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_auth_mysql, [
{description, "EMQX MySQL Authentication and Authorization"},
{vsn, "0.2.1"},
{vsn, "0.2.2"},
{registered, []},
{mod, {emqx_auth_mysql_app, []}},
{applications, [

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_auth_postgresql, [
{description, "EMQX PostgreSQL Authentication and Authorization"},
{vsn, "0.2.1"},
{vsn, "0.2.2"},
{registered, []},
{mod, {emqx_auth_postgresql_app, []}},
{applications, [

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_auth_redis, [
{description, "EMQX Redis Authentication and Authorization"},
{vsn, "0.2.1"},
{vsn, "0.2.2"},
{registered, []},
{mod, {emqx_auth_redis_app, []}},
{applications, [

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_bridge, [
{description, "EMQX bridges"},
{vsn, "0.2.3"},
{vsn, "0.2.4"},
{registered, [emqx_bridge_sup]},
{mod, {emqx_bridge_app, []}},
{applications, [

View File

@ -1154,7 +1154,7 @@ t_bridges_probe(Config) ->
?assertMatch(
{ok, 400, #{
<<"code">> := <<"TEST_FAILED">>,
<<"message">> := <<"Connection refused">>
<<"message">> := <<"Connection refused", _/binary>>
}},
request_json(
post,

View File

@ -889,7 +889,8 @@ t_sync_query_down(Config, Opts) ->
),
?force_ordering(
#{?snk_kind := call_query},
#{?snk_kind := SNKKind} when
SNKKind =:= call_query orelse SNKKind =:= simple_query_enter,
#{?snk_kind := cut_connection, ?snk_span := start}
),
%% Note: order of arguments here is reversed compared to `?force_ordering'.
@ -913,6 +914,7 @@ t_sync_query_down(Config, Opts) ->
emqx_common_test_helpers:enable_failure(down, ProxyName, ProxyHost, ProxyPort)
)
end),
?tp("publishing_message", #{}),
try
{_, {ok, _}} =
snabbkaffe:wait_async_action(
@ -921,6 +923,7 @@ t_sync_query_down(Config, Opts) ->
infinity
)
after
?tp("healing_failure", #{}),
emqx_common_test_helpers:heal_failure(down, ProxyName, ProxyHost, ProxyPort)
end,
{ok, _} = snabbkaffe:block_until(SuccessTPFilter, infinity),

View File

@ -23,7 +23,7 @@ defmodule EMQXBridgeAzureEventHub.MixProject do
def deps() do
[
{:wolff, github: "kafka4beam/wolff", tag: "2.0.0"},
{:wolff, github: "kafka4beam/wolff", tag: "3.0.2"},
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true},
{:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},
{:brod, github: "kafka4beam/brod", tag: "3.18.0"},

View File

@ -2,7 +2,7 @@
{erl_opts, [debug_info]}.
{deps, [
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "2.0.0"}}},
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "3.0.2"}}},
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},

View File

@ -382,12 +382,31 @@ t_multiple_actions_sharing_topic(Config) ->
ActionConfig0,
#{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}}
),
ok = emqx_bridge_v2_kafka_producer_SUITE:t_multiple_actions_sharing_topic(
[
{type, ?BRIDGE_TYPE_BIN},
{connector_name, ?config(connector_name, Config)},
{connector_config, ?config(connector_config, Config)},
{action_config, ActionConfig}
]
),
ok =
emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME(
[
{type, ?BRIDGE_TYPE_BIN},
{connector_name, ?config(connector_name, Config)},
{connector_config, ?config(connector_config, Config)},
{action_config, ActionConfig}
]
),
ok.
t_dynamic_topics(Config) ->
ActionConfig0 = ?config(action_config, Config),
ActionConfig =
emqx_utils_maps:deep_merge(
ActionConfig0,
#{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}}
),
ok =
emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME(
[
{type, ?BRIDGE_TYPE_BIN},
{connector_name, ?config(connector_name, Config)},
{connector_config, ?config(connector_config, Config)},
{action_config, ActionConfig}
]
),
ok.

View File

@ -23,7 +23,7 @@ defmodule EMQXBridgeConfluent.MixProject do
def deps() do
[
{:wolff, github: "kafka4beam/wolff", tag: "2.0.0"},
{:wolff, github: "kafka4beam/wolff", tag: "3.0.2"},
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true},
{:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},
{:brod, github: "kafka4beam/brod", tag: "3.18.0"},

View File

@ -2,7 +2,7 @@
{erl_opts, [debug_info]}.
{deps, [
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "2.0.0"}}},
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "3.0.2"}}},
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},

View File

@ -391,12 +391,31 @@ t_multiple_actions_sharing_topic(Config) ->
ActionConfig0,
#{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}}
),
ok = emqx_bridge_v2_kafka_producer_SUITE:t_multiple_actions_sharing_topic(
[
{type, ?ACTION_TYPE_BIN},
{connector_name, ?config(connector_name, Config)},
{connector_config, ?config(connector_config, Config)},
{action_config, ActionConfig}
]
),
ok =
emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME(
[
{type, ?ACTION_TYPE_BIN},
{connector_name, ?config(connector_name, Config)},
{connector_config, ?config(connector_config, Config)},
{action_config, ActionConfig}
]
),
ok.
t_dynamic_topics(Config) ->
ActionConfig0 = ?config(action_config, Config),
ActionConfig =
emqx_utils_maps:deep_merge(
ActionConfig0,
#{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}}
),
ok =
emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME(
[
{type, ?ACTION_TYPE_BIN},
{connector_name, ?config(connector_name, Config)},
{connector_config, ?config(connector_config, Config)},
{action_config, ActionConfig}
]
),
ok.

View File

@ -23,6 +23,7 @@ defmodule EMQXBridgeGcpPubsub.MixProject do
def deps() do
[
{:emqx_connector_jwt, in_umbrella: true},
{:emqx_connector, in_umbrella: true, runtime: false},
{:emqx_resource, in_umbrella: true},
{:emqx_bridge, in_umbrella: true, runtime: false},

View File

@ -9,6 +9,7 @@
debug_info
]}.
{deps, [
{emqx_connector_jwt, {path, "../../apps/emqx_connector_jwt"}},
{emqx_connector, {path, "../../apps/emqx_connector"}},
{emqx_resource, {path, "../../apps/emqx_resource"}},
{emqx_bridge, {path, "../../apps/emqx_bridge"}},

View File

@ -1,12 +1,13 @@
{application, emqx_bridge_gcp_pubsub, [
{description, "EMQX Enterprise GCP Pub/Sub Bridge"},
{vsn, "0.3.2"},
{vsn, "0.3.3"},
{registered, []},
{applications, [
kernel,
stdlib,
emqx_resource,
ehttpc
ehttpc,
emqx_connector_jwt
]},
{env, [
{emqx_action_info_modules, [

View File

@ -5,7 +5,7 @@
-module(emqx_bridge_gcp_pubsub_client).
-include_lib("jose/include/jose_jwk.hrl").
-include_lib("emqx_connector/include/emqx_connector_tables.hrl").
-include_lib("emqx_connector_jwt/include/emqx_connector_jwt_tables.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx/include/logger.hrl").

View File

@ -594,7 +594,7 @@ cluster(Config) ->
Cluster = emqx_common_test_helpers:emqx_cluster(
[core, core],
[
{apps, [emqx_conf, emqx_rule_engine, emqx_bridge]},
{apps, [emqx_conf, emqx_rule_engine, emqx_bridge_gcp_pubsub, emqx_bridge]},
{listener_ports, []},
{priv_data_dir, PrivDataDir},
{load_schema, true},

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_http, [
{description, "EMQX HTTP Bridge and Connector Application"},
{vsn, "0.3.3"},
{vsn, "0.3.4"},
{registered, []},
{applications, [kernel, stdlib, emqx_resource, ehttpc]},
{env, [

View File

@ -23,7 +23,7 @@ defmodule EMQXBridgeKafka.MixProject do
def deps() do
[
{:wolff, github: "kafka4beam/wolff", tag: "2.0.0"},
{:wolff, github: "kafka4beam/wolff", tag: "3.0.2"},
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true},
{:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},
{:brod, github: "kafka4beam/brod", tag: "3.18.0"},

View File

@ -2,7 +2,7 @@
{erl_opts, [debug_info]}.
{deps, [
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "2.0.0"}}},
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "3.0.2"}}},
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_bridge_kafka, [
{description, "EMQX Enterprise Kafka Bridge"},
{vsn, "0.3.3"},
{vsn, "0.3.4"},
{registered, [emqx_bridge_kafka_consumer_sup]},
{applications, [
kernel,

View File

@ -295,6 +295,7 @@ fields("config_producer") ->
fields("config_consumer") ->
fields(kafka_consumer);
fields(kafka_producer) ->
%% Schema used by bridges V1.
connector_config_fields() ++ producer_opts(v1);
fields(kafka_producer_action) ->
emqx_bridge_v2_schema:common_fields() ++ producer_opts(action);
@ -356,9 +357,33 @@ fields(socket_opts) ->
validator => fun emqx_schema:validate_tcp_keepalive/1
})}
];
fields(v1_producer_kafka_opts) ->
OldSchemaFields =
[
topic,
message,
max_batch_bytes,
compression,
partition_strategy,
required_acks,
kafka_headers,
kafka_ext_headers,
kafka_header_value_encode_mode,
partition_count_refresh_interval,
partitions_limit,
max_inflight,
buffer,
query_mode,
sync_query_timeout
],
Fields = fields(producer_kafka_opts),
lists:filter(
fun({K, _V}) -> lists:member(K, OldSchemaFields) end,
Fields
);
fields(producer_kafka_opts) ->
[
{topic, mk(string(), #{required => true, desc => ?DESC(kafka_topic)})},
{topic, mk(emqx_schema:template(), #{required => true, desc => ?DESC(kafka_topic)})},
{message, mk(ref(kafka_message), #{required => false, desc => ?DESC(kafka_message)})},
{max_batch_bytes,
mk(emqx_schema:bytesize(), #{default => <<"896KB">>, desc => ?DESC(max_batch_bytes)})},
@ -672,15 +697,15 @@ resource_opts() ->
%% However we need to keep it backward compatible for generated schema json (version 0.1.0)
%% since schema is data for the 'schemas' API.
parameters_field(ActionOrBridgeV1) ->
{Name, Alias} =
{Name, Alias, Ref} =
case ActionOrBridgeV1 of
v1 ->
{kafka, parameters};
{kafka, parameters, v1_producer_kafka_opts};
action ->
{parameters, kafka}
{parameters, kafka, producer_kafka_opts}
end,
{Name,
mk(ref(producer_kafka_opts), #{
mk(ref(Ref), #{
required => true,
aliases => [Alias],
desc => ?DESC(producer_kafka_opts),

View File

@ -3,6 +3,8 @@
%%--------------------------------------------------------------------
-module(emqx_bridge_kafka_impl_producer).
-feature(maybe_expr, enable).
-behaviour(emqx_resource).
-include_lib("emqx_resource/include/emqx_resource.hrl").
@ -125,8 +127,8 @@ on_add_channel(
{ok, NewState}.
create_producers_for_bridge_v2(
InstId,
BridgeV2Id,
ConnResId,
ActionResId,
ClientId,
#{
bridge_type := BridgeType,
@ -135,33 +137,42 @@ create_producers_for_bridge_v2(
) ->
#{
message := MessageTemplate,
topic := KafkaTopic,
topic := KafkaTopic0,
sync_query_timeout := SyncQueryTimeout
} = KafkaConfig,
TopicTemplate = {TopicType, TopicOrTemplate} = maybe_preproc_topic(KafkaTopic0),
MKafkaTopic =
case TopicType of
fixed -> TopicOrTemplate;
dynamic -> dynamic
end,
KafkaHeadersTokens = preproc_kafka_headers(maps:get(kafka_headers, KafkaConfig, undefined)),
KafkaExtHeadersTokens = preproc_ext_headers(maps:get(kafka_ext_headers, KafkaConfig, [])),
KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none),
MaxPartitions = maps:get(partitions_limit, KafkaConfig, all_partitions),
#{name := BridgeName} = emqx_bridge_v2:parse_id(BridgeV2Id),
IsDryRun = emqx_resource:is_dry_run(BridgeV2Id),
ok = check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions),
#{name := BridgeName} = emqx_bridge_v2:parse_id(ActionResId),
IsDryRun = emqx_resource:is_dry_run(ActionResId),
ok = check_topic_and_leader_connections(ActionResId, ClientId, MKafkaTopic, MaxPartitions),
WolffProducerConfig = producers_config(
BridgeType, BridgeName, KafkaConfig, IsDryRun, BridgeV2Id
BridgeType, BridgeName, KafkaConfig, IsDryRun, ActionResId
),
case wolff:ensure_supervised_producers(ClientId, KafkaTopic, WolffProducerConfig) of
case wolff:ensure_supervised_dynamic_producers(ClientId, WolffProducerConfig) of
{ok, Producers} ->
ok = emqx_resource:allocate_resource(InstId, {?kafka_producers, BridgeV2Id}, Producers),
ok = emqx_resource:allocate_resource(
InstId, {?kafka_telemetry_id, BridgeV2Id}, BridgeV2Id
ConnResId, {?kafka_producers, ActionResId}, Producers
),
_ = maybe_install_wolff_telemetry_handlers(BridgeV2Id),
ok = emqx_resource:allocate_resource(
ConnResId, {?kafka_telemetry_id, ActionResId}, ActionResId
),
_ = maybe_install_wolff_telemetry_handlers(ActionResId),
{ok, #{
message_template => compile_message_template(MessageTemplate),
kafka_client_id => ClientId,
kafka_topic => KafkaTopic,
topic_template => TopicTemplate,
topic => MKafkaTopic,
producers => Producers,
resource_id => BridgeV2Id,
connector_resource_id => InstId,
resource_id => ActionResId,
connector_resource_id => ConnResId,
sync_query_timeout => SyncQueryTimeout,
kafka_config => KafkaConfig,
headers_tokens => KafkaHeadersTokens,
@ -172,9 +183,9 @@ create_producers_for_bridge_v2(
{error, Reason2} ->
?SLOG(error, #{
msg => "failed_to_start_kafka_producer",
instance_id => InstId,
instance_id => ConnResId,
kafka_client_id => ClientId,
kafka_topic => KafkaTopic,
kafka_topic => MKafkaTopic,
reason => Reason2
}),
throw(
@ -267,7 +278,9 @@ remove_producers_for_bridge_v2(
ClientId = maps:get(?kafka_client_id, AllocatedResources, no_client_id),
maps:foreach(
fun
({?kafka_producers, BridgeV2IdCheck}, Producers) when BridgeV2IdCheck =:= BridgeV2Id ->
({?kafka_producers, BridgeV2IdCheck}, Producers) when
BridgeV2IdCheck =:= BridgeV2Id
->
deallocate_producers(ClientId, Producers);
({?kafka_telemetry_id, BridgeV2IdCheck}, TelemetryId) when
BridgeV2IdCheck =:= BridgeV2Id
@ -300,7 +313,8 @@ on_query(
#{installed_bridge_v2s := BridgeV2Configs} = _ConnectorState
) ->
#{
message_template := Template,
message_template := MessageTemplate,
topic_template := TopicTemplate,
producers := Producers,
sync_query_timeout := SyncTimeout,
headers_tokens := KafkaHeadersTokens,
@ -313,7 +327,8 @@ on_query(
headers_val_encode_mode => KafkaHeadersValEncodeMode
},
try
KafkaMessage = render_message(Template, KafkaHeaders, Message),
KafkaTopic = render_topic(TopicTemplate, Message),
KafkaMessage = render_message(MessageTemplate, KafkaHeaders, Message),
?tp(
emqx_bridge_kafka_impl_producer_sync_query,
#{headers_config => KafkaHeaders, instance_id => InstId}
@ -321,9 +336,15 @@ on_query(
emqx_trace:rendered_action_template(MessageTag, #{
message => KafkaMessage
}),
do_send_msg(sync, KafkaMessage, Producers, SyncTimeout)
do_send_msg(sync, KafkaTopic, KafkaMessage, Producers, SyncTimeout)
catch
error:{invalid_partition_count, Count, _Partitioner} ->
throw:bad_topic ->
?tp("kafka_producer_failed_to_render_topic", #{}),
{error, {unrecoverable_error, failed_to_render_topic}};
throw:#{cause := unknown_topic_or_partition, topic := Topic} ->
?tp("kafka_producer_resolved_to_unknown_topic", #{}),
{error, {unrecoverable_error, {resolved_to_unknown_topic, Topic}}};
throw:#{cause := invalid_partition_count, count := Count} ->
?tp("kafka_producer_invalid_partition_count", #{
action_id => MessageTag,
query_mode => sync
@ -368,6 +389,7 @@ on_query_async(
) ->
#{
message_template := Template,
topic_template := TopicTemplate,
producers := Producers,
headers_tokens := KafkaHeadersTokens,
ext_headers_tokens := KafkaExtHeadersTokens,
@ -379,6 +401,7 @@ on_query_async(
headers_val_encode_mode => KafkaHeadersValEncodeMode
},
try
KafkaTopic = render_topic(TopicTemplate, Message),
KafkaMessage = render_message(Template, KafkaHeaders, Message),
?tp(
emqx_bridge_kafka_impl_producer_async_query,
@ -387,9 +410,15 @@ on_query_async(
emqx_trace:rendered_action_template(MessageTag, #{
message => KafkaMessage
}),
do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn)
do_send_msg(async, KafkaTopic, KafkaMessage, Producers, AsyncReplyFn)
catch
error:{invalid_partition_count, Count, _Partitioner} ->
throw:bad_topic ->
?tp("kafka_producer_failed_to_render_topic", #{}),
{error, {unrecoverable_error, failed_to_render_topic}};
throw:#{cause := unknown_topic_or_partition, topic := Topic} ->
?tp("kafka_producer_resolved_to_unknown_topic", #{}),
{error, {unrecoverable_error, {resolved_to_unknown_topic, Topic}}};
throw:#{cause := invalid_partition_count, count := Count} ->
?tp("kafka_producer_invalid_partition_count", #{
action_id => MessageTag,
query_mode => async
@ -427,9 +456,28 @@ compile_message_template(T) ->
timestamp => preproc_tmpl(TimestampTemplate)
}.
maybe_preproc_topic(Topic) ->
Template = emqx_template:parse(Topic),
case emqx_template:placeholders(Template) of
[] ->
{fixed, bin(Topic)};
[_ | _] ->
{dynamic, Template}
end.
preproc_tmpl(Tmpl) ->
emqx_placeholder:preproc_tmpl(Tmpl).
render_topic({fixed, KafkaTopic}, _Message) ->
KafkaTopic;
render_topic({dynamic, Template}, Message) ->
try
iolist_to_binary(emqx_template:render_strict(Template, {emqx_jsonish, Message}))
catch
error:_Errors ->
throw(bad_topic)
end.
render_message(
#{key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate},
#{
@ -471,9 +519,11 @@ render_timestamp(Template, Message) ->
erlang:system_time(millisecond)
end.
do_send_msg(sync, KafkaMessage, Producers, SyncTimeout) ->
do_send_msg(sync, KafkaTopic, KafkaMessage, Producers, SyncTimeout) ->
try
{_Partition, _Offset} = wolff:send_sync(Producers, [KafkaMessage], SyncTimeout),
{_Partition, _Offset} = wolff:send_sync2(
Producers, KafkaTopic, [KafkaMessage], SyncTimeout
),
ok
catch
error:{producer_down, _} = Reason ->
@ -481,7 +531,7 @@ do_send_msg(sync, KafkaMessage, Producers, SyncTimeout) ->
error:timeout ->
{error, timeout}
end;
do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn) ->
do_send_msg(async, KafkaTopic, KafkaMessage, Producers, AsyncReplyFn) ->
%% * Must be a batch because wolff:send and wolff:send_sync are batch APIs
%% * Must be a single element batch because wolff books calls, but not batch sizes
%% for counters and gauges.
@ -489,7 +539,9 @@ do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn) ->
%% The retuned information is discarded here.
%% If the producer process is down when sending, this function would
%% raise an error exception which is to be caught by the caller of this callback
{_Partition, Pid} = wolff:send(Producers, Batch, {fun ?MODULE:on_kafka_ack/3, [AsyncReplyFn]}),
{_Partition, Pid} = wolff:send2(
Producers, KafkaTopic, Batch, {fun ?MODULE:on_kafka_ack/3, [AsyncReplyFn]}
),
%% this Pid is so far never used because Kafka producer is by-passing the buffer worker
{ok, Pid}.
@ -512,7 +564,7 @@ on_kafka_ack(_Partition, message_too_large, {ReplyFn, Args}) ->
%% `emqx_resource_buffer_worker', we must avoid returning `disconnected' here. Otherwise,
%% `emqx_resource_manager' will kill the wolff producers and messages might be lost.
on_get_status(
_InstId,
ConnResId,
#{client_id := ClientId} = State
) ->
%% Note: we must avoid returning `?status_disconnected' here if the connector ever was
@ -522,7 +574,7 @@ on_get_status(
%% held in wolff producer's replayq.
case check_client_connectivity(ClientId) of
ok ->
maybe_check_health_check_topic(State);
maybe_check_health_check_topic(ConnResId, State);
{error, {find_client, _Error}} ->
?status_connecting;
{error, {connectivity, Error}} ->
@ -530,20 +582,23 @@ on_get_status(
end.
on_get_channel_status(
_ResId,
ChannelId,
_ConnResId,
ActionResId,
#{
client_id := ClientId,
installed_bridge_v2s := Channels
} = _State
} = _ConnState
) ->
%% Note: we must avoid returning `?status_disconnected' here. Returning
%% `?status_disconnected' will make resource manager try to restart the producers /
%% connector, thus potentially dropping data held in wolff producer's replayq. The
%% only exception is if the topic does not exist ("unhealthy target").
#{kafka_topic := KafkaTopic, partitions_limit := MaxPartitions} = maps:get(ChannelId, Channels),
#{
topic := MKafkaTopic,
partitions_limit := MaxPartitions
} = maps:get(ActionResId, Channels),
try
ok = check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions),
ok = check_topic_and_leader_connections(ActionResId, ClientId, MKafkaTopic, MaxPartitions),
?status_connected
catch
throw:{unhealthy_target, Msg} ->
@ -552,22 +607,29 @@ on_get_channel_status(
{?status_connecting, {K, E}}
end.
check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions) ->
check_topic_and_leader_connections(ActionResId, ClientId, MKafkaTopic, MaxPartitions) ->
case wolff_client_sup:find_client(ClientId) of
{ok, Pid} ->
ok = check_topic_status(ClientId, Pid, KafkaTopic),
ok = check_if_healthy_leaders(ClientId, Pid, KafkaTopic, MaxPartitions);
maybe
true ?= is_binary(MKafkaTopic),
ok = check_topic_status(ClientId, Pid, MKafkaTopic),
ok = check_if_healthy_leaders(
ActionResId, ClientId, Pid, MKafkaTopic, MaxPartitions
)
else
false -> ok
end;
{error, #{reason := no_such_client}} ->
throw(#{
reason => cannot_find_kafka_client,
kafka_client => ClientId,
kafka_topic => KafkaTopic
kafka_topic => MKafkaTopic
});
{error, #{reason := client_supervisor_not_initialized}} ->
throw(#{
reason => restarting,
kafka_client => ClientId,
kafka_topic => KafkaTopic
kafka_topic => MKafkaTopic
})
end.
@ -586,21 +648,23 @@ check_client_connectivity(ClientId) ->
{error, {find_client, Reason}}
end.
maybe_check_health_check_topic(#{health_check_topic := Topic} = ConnectorState) when
maybe_check_health_check_topic(ConnResId, #{health_check_topic := Topic} = ConnectorState) when
is_binary(Topic)
->
#{client_id := ClientId} = ConnectorState,
MaxPartitions = all_partitions,
try check_topic_and_leader_connections(ClientId, Topic, MaxPartitions) of
try check_topic_and_leader_connections(ConnResId, ClientId, Topic, MaxPartitions) of
ok ->
?status_connected
catch
throw:{unhealthy_target, Msg} ->
{?status_disconnected, ConnectorState, Msg};
throw:#{reason := {connection_down, _} = Reason} ->
{?status_disconnected, ConnectorState, Reason};
throw:#{reason := Reason} ->
{?status_connecting, ConnectorState, Reason}
end;
maybe_check_health_check_topic(_) ->
maybe_check_health_check_topic(_ConnResId, _ConnState) ->
%% Cannot infer further information. Maybe upgraded from older version.
?status_connected.
@ -612,8 +676,10 @@ error_summary(Map, [Error]) ->
error_summary(Map, [Error | More]) ->
Map#{first_error => Error, total_errors => length(More) + 1}.
check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic, MaxPartitions) when is_pid(ClientPid) ->
case wolff_client:get_leader_connections(ClientPid, KafkaTopic, MaxPartitions) of
check_if_healthy_leaders(ActionResId, ClientId, ClientPid, KafkaTopic, MaxPartitions) when
is_pid(ClientPid)
->
case wolff_client:get_leader_connections(ClientPid, ActionResId, KafkaTopic, MaxPartitions) of
{ok, Leaders} ->
%% Kafka is considered healthy as long as any of the partition leader is reachable.
case lists:partition(fun({_Partition, Pid}) -> is_alive(Pid) end, Leaders) of
@ -675,7 +741,7 @@ ssl(#{enable := true} = SSL) ->
ssl(_) ->
false.
producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) ->
producers_config(BridgeType, BridgeName, Input, IsDryRun, ActionResId) ->
#{
max_batch_bytes := MaxBatchBytes,
compression := Compression,
@ -717,8 +783,8 @@ producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) ->
max_batch_bytes => MaxBatchBytes,
max_send_ahead => MaxInflight - 1,
compression => Compression,
alias => BridgeV2Id,
telemetry_meta_data => #{bridge_id => BridgeV2Id},
group => ActionResId,
telemetry_meta_data => #{bridge_id => ActionResId},
max_partitions => MaxPartitions
}.
@ -794,20 +860,19 @@ handle_telemetry_event(_EventId, _Metrics, _MetaData, _HandlerConfig) ->
%% Note: don't use the instance/manager ID, as that changes everytime
%% the bridge is recreated, and will lead to multiplication of
%% metrics.
-spec telemetry_handler_id(resource_id()) -> binary().
telemetry_handler_id(ResourceID) ->
<<"emqx-bridge-kafka-producer-", ResourceID/binary>>.
-spec telemetry_handler_id(action_resource_id()) -> binary().
telemetry_handler_id(ActionResId) ->
ActionResId.
uninstall_telemetry_handlers(ResourceID) ->
HandlerID = telemetry_handler_id(ResourceID),
telemetry:detach(HandlerID).
uninstall_telemetry_handlers(TelemetryId) ->
telemetry:detach(TelemetryId).
maybe_install_wolff_telemetry_handlers(ResourceID) ->
maybe_install_wolff_telemetry_handlers(TelemetryId) ->
%% Attach event handlers for Kafka telemetry events. If a handler with the
%% handler id already exists, the attach_many function does nothing
telemetry:attach_many(
%% unique handler id
telemetry_handler_id(ResourceID),
telemetry_handler_id(TelemetryId),
[
[wolff, dropped_queue_full],
[wolff, queuing],
@ -819,7 +884,7 @@ maybe_install_wolff_telemetry_handlers(ResourceID) ->
%% wolff producers; otherwise, multiple kafka producer bridges
%% will install multiple handlers to the same wolff events,
%% multiplying the metric counts...
#{bridge_id => ResourceID}
#{bridge_id => TelemetryId}
).
preproc_kafka_headers(HeadersTmpl) when HeadersTmpl =:= <<>>; HeadersTmpl =:= undefined ->

View File

@ -26,7 +26,12 @@ schema_module() -> emqx_bridge_kafka.
connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
BridgeV1Config1 = maps:remove(<<"connector">>, ActionConfig),
BridgeV1Config2 = emqx_utils_maps:deep_merge(ConnectorConfig, BridgeV1Config1),
emqx_utils_maps:rename(<<"parameters">>, <<"kafka">>, BridgeV1Config2).
BridgeV1Config = emqx_utils_maps:rename(<<"parameters">>, <<"kafka">>, BridgeV1Config2),
maps:update_with(
<<"kafka">>,
fun(Params) -> maps:with(v1_parameters(), Params) end,
BridgeV1Config
).
bridge_v1_config_to_action_config(BridgeV1Conf0 = #{<<"producer">> := _}, ConnectorName) ->
%% Ancient v1 config, when `kafka' key was wrapped by `producer'
@ -51,6 +56,12 @@ bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) ->
%% Internal helper functions
%%------------------------------------------------------------------------------------------
v1_parameters() ->
[
to_bin(K)
|| {K, _} <- emqx_bridge_kafka:fields(v1_producer_kafka_opts)
].
producer_action_field_keys() ->
[
to_bin(K)

View File

@ -477,7 +477,7 @@ do_start_producer(KafkaClientId, KafkaTopic) ->
ProducerConfig =
#{
name => Name,
partitioner => roundrobin,
partitioner => random,
partition_count_refresh_interval_seconds => 1_000,
replayq_max_total_bytes => 10_000,
replayq_seg_bytes => 9_000,
@ -1520,7 +1520,7 @@ t_receive_after_recovery(Config) ->
key => <<"commit", (integer_to_binary(N))/binary>>,
value => <<"commit", (integer_to_binary(N))/binary>>
}
|| N <- lists:seq(1, NPartitions)
|| N <- lists:seq(1, NPartitions * 10)
],
%% we do distinct passes over this producing part so that
%% wolff won't batch everything together.
@ -1918,13 +1918,14 @@ t_node_joins_existing_cluster(Config) ->
_Attempts2 = 50,
[] =/= erpc:call(N2, emqx_router, lookup_routes, [MQTTTopic])
),
NumMsgs = 50 * NPartitions,
{ok, SRef1} =
snabbkaffe:subscribe(
?match_event(#{
?snk_kind := kafka_consumer_handle_message,
?snk_span := {complete, _}
}),
NPartitions,
NumMsgs,
20_000
),
lists:foreach(
@ -1933,7 +1934,7 @@ t_node_joins_existing_cluster(Config) ->
Val = <<"v", (integer_to_binary(N))/binary>>,
publish(Config, KafkaTopic, [#{key => Key, value => Val}])
end,
lists:seq(1, NPartitions)
lists:seq(1, NumMsgs)
),
{ok, _} = snabbkaffe:receive_events(SRef1),

View File

@ -23,6 +23,7 @@
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("brod/include/brod.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
-include_lib("emqx/include/asserts.hrl").
-import(emqx_common_test_helpers, [on_exit/1]).
@ -165,6 +166,9 @@ send_message(Type, ActionName) ->
resolve_kafka_offset() ->
KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(),
resolve_kafka_offset(KafkaTopic).
resolve_kafka_offset(KafkaTopic) ->
Partition = 0,
Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(),
{ok, Offset0} = emqx_bridge_kafka_impl_producer_SUITE:resolve_kafka_offset(
@ -174,11 +178,32 @@ resolve_kafka_offset() ->
check_kafka_message_payload(Offset, ExpectedPayload) ->
KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(),
check_kafka_message_payload(KafkaTopic, Offset, ExpectedPayload).
check_kafka_message_payload(KafkaTopic, Offset, ExpectedPayload) ->
Partition = 0,
Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(),
{ok, {_, [KafkaMsg0]}} = brod:fetch(Hosts, KafkaTopic, Partition, Offset),
?assertMatch(#kafka_message{value = ExpectedPayload}, KafkaMsg0).
ensure_kafka_topic(KafkaTopic) ->
TopicConfigs = [
#{
name => KafkaTopic,
num_partitions => 1,
replication_factor => 1,
assignments => [],
configs => []
}
],
RequestConfig = #{timeout => 5_000},
ConnConfig = #{},
Endpoints = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(),
case brod:create_topics(Endpoints, TopicConfigs, RequestConfig, ConnConfig) of
ok -> ok;
{error, topic_already_exists} -> ok
end.
action_config(ConnectorName) ->
action_config(ConnectorName, _Overrides = #{}).
@ -715,6 +740,21 @@ t_connector_health_check_topic(_Config) ->
emqx_bridge_v2_testlib:update_connector_api(Name, Type, ConnectorConfig1)
),
%% By providing an inexistent health check topic, we should detect it's
%% disconnected without the need for an action.
ConnectorConfig2 = connector_config(#{
<<"bootstrap_hosts">> => iolist_to_binary(kafka_hosts_string()),
<<"health_check_topic">> => <<"i-dont-exist-999">>
}),
?assertMatch(
{ok,
{{_, 200, _}, _, #{
<<"status">> := <<"disconnected">>,
<<"status_reason">> := <<"Unknown topic or partition", _/binary>>
}}},
emqx_bridge_v2_testlib:update_connector_api(Name, Type, ConnectorConfig2)
),
ok
end,
[]
@ -768,9 +808,13 @@ t_invalid_partition_count_metrics(Config) ->
%% Simulate `invalid_partition_count'
emqx_common_test_helpers:with_mock(
wolff,
send_sync,
fun(_Producers, _Msgs, _Timeout) ->
error({invalid_partition_count, 0, partitioner})
send_sync2,
fun(_Producers, _Topic, _Msgs, _Timeout) ->
throw(#{
cause => invalid_partition_count,
count => 0,
partitioner => partitioner
})
end,
fun() ->
{{ok, _}, {ok, _}} =
@ -813,9 +857,13 @@ t_invalid_partition_count_metrics(Config) ->
%% Simulate `invalid_partition_count'
emqx_common_test_helpers:with_mock(
wolff,
send,
fun(_Producers, _Msgs, _Timeout) ->
error({invalid_partition_count, 0, partitioner})
send2,
fun(_Producers, _Topic, _Msgs, _AckCallback) ->
throw(#{
cause => invalid_partition_count,
count => 0,
partitioner => partitioner
})
end,
fun() ->
{{ok, _}, {ok, _}} =
@ -921,3 +969,126 @@ t_multiple_actions_sharing_topic(Config) ->
end
),
ok.
%% Smoke tests for using a templated topic and adynamic kafka topics.
t_dynamic_topics(Config) ->
Type = proplists:get_value(type, Config, ?TYPE),
ConnectorName = proplists:get_value(connector_name, Config, <<"c">>),
ConnectorConfig = proplists:get_value(connector_config, Config, connector_config()),
ActionName = <<"dynamic_topics">>,
ActionConfig1 = proplists:get_value(action_config, Config, action_config(ConnectorName)),
PreConfiguredTopic1 = <<"pct1">>,
PreConfiguredTopic2 = <<"pct2">>,
ensure_kafka_topic(PreConfiguredTopic1),
ensure_kafka_topic(PreConfiguredTopic2),
ActionConfig = emqx_bridge_v2_testlib:parse_and_check(
action,
Type,
ActionName,
emqx_utils_maps:deep_merge(
ActionConfig1,
#{
<<"parameters">> => #{
<<"topic">> => <<"pct${.payload.n}">>,
<<"message">> => #{
<<"key">> => <<"${.clientid}">>,
<<"value">> => <<"${.payload.p}">>
}
}
}
)
),
?check_trace(
#{timetrap => 7_000},
begin
ConnectorParams = [
{connector_config, ConnectorConfig},
{connector_name, ConnectorName},
{connector_type, Type}
],
ActionParams = [
{action_config, ActionConfig},
{action_name, ActionName},
{action_type, Type}
],
{ok, {{_, 201, _}, _, #{}}} =
emqx_bridge_v2_testlib:create_connector_api(ConnectorParams),
{ok, {{_, 201, _}, _, #{}}} =
emqx_bridge_v2_testlib:create_action_api(ActionParams),
RuleTopic = <<"pct">>,
{ok, _} = emqx_bridge_v2_testlib:create_rule_and_action_http(
Type,
RuleTopic,
[
{bridge_name, ActionName}
]
),
?assertStatusAPI(Type, ActionName, <<"connected">>),
HandlerId = ?FUNCTION_NAME,
TestPid = self(),
telemetry:attach_many(
HandlerId,
emqx_resource_metrics:events(),
fun(EventName, Measurements, Metadata, _Config) ->
Data = #{
name => EventName,
measurements => Measurements,
metadata => Metadata
},
TestPid ! {telemetry, Data},
ok
end,
unused_config
),
on_exit(fun() -> telemetry:detach(HandlerId) end),
{ok, C} = emqtt:start_link(#{}),
{ok, _} = emqtt:connect(C),
Payload = fun(Map) -> emqx_utils_json:encode(Map) end,
Offset1 = resolve_kafka_offset(PreConfiguredTopic1),
Offset2 = resolve_kafka_offset(PreConfiguredTopic2),
{ok, _} = emqtt:publish(C, RuleTopic, Payload(#{n => 1, p => <<"p1">>}), [{qos, 1}]),
{ok, _} = emqtt:publish(C, RuleTopic, Payload(#{n => 2, p => <<"p2">>}), [{qos, 1}]),
check_kafka_message_payload(PreConfiguredTopic1, Offset1, <<"p1">>),
check_kafka_message_payload(PreConfiguredTopic2, Offset2, <<"p2">>),
ActionId = emqx_bridge_v2:id(Type, ActionName),
?assertEqual(2, emqx_resource_metrics:matched_get(ActionId)),
?assertEqual(2, emqx_resource_metrics:success_get(ActionId)),
?assertEqual(0, emqx_resource_metrics:queuing_get(ActionId)),
?assertReceive(
{telemetry, #{
measurements := #{gauge_set := _},
metadata := #{worker_id := _, resource_id := ActionId}
}}
),
%% If there isn't enough information in the context to resolve to a topic, it
%% should be an unrecoverable error.
?assertMatch(
{_, {ok, _}},
?wait_async_action(
emqtt:publish(C, RuleTopic, Payload(#{not_enough => <<"info">>}), [{qos, 1}]),
#{?snk_kind := "kafka_producer_failed_to_render_topic"}
)
),
%% If it's possible to render the topic, but it isn't in the pre-configured
%% list, it should be an unrecoverable error.
?assertMatch(
{_, {ok, _}},
?wait_async_action(
emqtt:publish(C, RuleTopic, Payload(#{n => 99}), [{qos, 1}]),
#{?snk_kind := "kafka_producer_resolved_to_unknown_topic"}
)
),
ok
end,
[]
),
ok.

Some files were not shown because too many files have changed in this diff Show More