Compare commits

...

464 Commits

Author SHA1 Message Date
Ivan Dyachkov bcd63344b8
Merge pull request #13583 from id/20240807-sync-release-branches
sync release branches
2024-08-07 11:38:14 +02:00
Ivan Dyachkov cc3b26a3ac Merge remote-tracking branch 'upstream/release-58' into 20240807-sync-release-branches 2024-08-07 09:48:38 +02:00
Ivan Dyachkov dd686c24a0 Merge remote-tracking branch 'upstream/release-57' into 20240807-sync-release-branches 2024-08-07 09:44:38 +02:00
Ivan Dyachkov 592c4e0045
Merge pull request #12774 from emqx/dependabot/github_actions/dot-github/actions/package-macos/actions-package-macos-83d1e47aa6
chore(deps): bump the actions-package-macos group in /.github/actions/package-macos with 1 update
2024-08-07 09:43:53 +02:00
Ivan Dyachkov 073e3ea0a8
Merge pull request #13569 from emqx/dependabot/github_actions/actions-ef71aea555
chore(deps): bump the actions group across 1 directory with 8 updates
2024-08-07 09:35:52 +02:00
Xinyu Liu 81978ceaeb
Merge pull request #13571 from terry-xiaoyu/fast_fail_on_invalid_ssl_opts
chore: update esockd to 5.12.0
2024-08-07 11:21:32 +08:00
Ilia Averianov 6bfddd9952
Merge pull request #13565 from savonarola/0801-shared-subs-compact-structures
Reduce size of shared sub protocol structures
2024-08-06 19:56:08 +03:00
Thales Macedo Garitezi cf608a73a5
Merge pull request #13578 from thalesmg/20240806-r58-port-raft-precond
feat(dsraft): support atomic batches + preconditions (release-58)
2024-08-06 13:40:46 -03:00
Thales Macedo Garitezi a8200fb83d
Merge pull request #13579 from thalesmg/20240806-r58-test-flaky-consumer-rebalance
test: attempt to reduce test flakiness
2024-08-06 13:33:46 -03:00
Ilya Averyanov 9ad65c6ac1 feat(queue): reduce logging levels 2024-08-06 18:45:15 +03:00
Thales Macedo Garitezi 9ca3985bbd test: attempt to reduce test flakiness 2024-08-06 12:44:51 -03:00
Ilya Averyanov e17becb84d feat(queue): compact protocol structures, organize formatting 2024-08-06 18:05:02 +03:00
Andrew Mayorov 5dd8fefded test(ds): avoid side effects in check phase 2024-08-06 11:43:12 -03:00
Andrew Mayorov 7b85faf12a chore(dsraft): fix few spelling errors
Co-Authored-By: Thales Macedo Garitezi <thalesmg@gmail.com>
2024-08-06 11:43:12 -03:00
Andrew Mayorov b0594271b2 chore(dsraft): fix a typespec 2024-08-06 11:43:12 -03:00
Andrew Mayorov d8aa39a310 fix(dsraft): use local application environment 2024-08-06 11:43:12 -03:00
Andrew Mayorov fc0434afc8 chore(dslocal): refine few typespecs 2024-08-06 11:43:12 -03:00
Andrew Mayorov 5502af18b7 feat(ds): support deletions + precondition-related API in bitfield-lts 2024-08-06 11:43:12 -03:00
Andrew Mayorov 9f96e0957e test(ds): verify deletions work predictably 2024-08-06 11:43:12 -03:00
Andrew Mayorov 109ffe7a70 fix(dsbackend): unify timestamp resolution in operations / preconditions 2024-08-06 11:43:12 -03:00
Andrew Mayorov 1559aac486 test(dsbackend): add shared tests for atomic batches + preconditions 2024-08-06 11:43:12 -03:00
Andrew Mayorov 68990f1538 feat(ds): support operations + preconditions in skipstream-lts 2024-08-06 11:43:12 -03:00
Andrew Mayorov 5356d678cc feat(dsraft): support atomic batches + preconditions 2024-08-06 11:43:12 -03:00
Andrew Mayorov 11951f8f6c feat(ds): adopt buffer interface to `emqx_ds:operation()` 2024-08-06 11:43:12 -03:00
Andrew Mayorov 0aa4cdbaf3 feat(ds): add generic preconditions implementation 2024-08-06 11:43:12 -03:00
Ivan Dyachkov 281f8ddc83
Merge pull request #13575 from Kinplemelon/kinple/upgrade-dashboard-58
chore(dashboard): bump dashboard version to v1.10.0-beta.1 & e1.8.0-beta.1
2024-08-06 16:39:06 +02:00
Kinplemelon b80513e941 ci: update emqx docs link in dashboard 2024-08-06 15:21:19 +02:00
Ivan Dyachkov 822ed71282 chore: release 5.7.2 2024-08-06 13:25:56 +02:00
Kinple b8fd5de2a5
Merge pull request #13577 from Kinplemelon/kinple/upgrade-dashboard
chore(dashboard): bump dashboard version to v1.9.2 & e1.7.2
2024-08-06 19:02:50 +08:00
Kinplemelon 3ee84d60ae chore(dashboard): bump dashboard version to v1.9.2 & e1.7.2 2024-08-06 18:11:35 +08:00
Andrew Mayorov 3b52b658cd
Merge pull request #13559 from keynslug/feat/EMQX-12309/raft-precond
feat(dsraft): support atomic batches + preconditions
2024-08-06 09:17:16 +02:00
Kinplemelon cba3dcbeda chore(dashboard): bump dashboard version to v1.10.0-beta.1 & e1.8.0-beta.1 2024-08-06 13:44:16 +08:00
Kinple caf1897979
Merge pull request #13574 from Kinplemelon/kinple/upgrade-dashboard
chore(dashboard): bump dashboard version to e1.7.2-beta.7
2024-08-06 10:51:03 +08:00
Kinplemelon dbbd5e1458 ci: update emqx docs link in dashboard 2024-08-06 09:33:20 +08:00
Kinplemelon 0ab31df9d2 chore(dashboard): bump dashboard version to v1.9.2-beta.1 & e1.7.2-beta.7 2024-08-06 09:32:17 +08:00
Thales Macedo Garitezi 613fc644f5
Merge pull request #13425 from kjellwinblad/kjell/review_connector_error_logs_mqtt_etc/EMQX-12555/EMQX-12657
fix: make MQTT connector error log messages easier to understand
2024-08-05 17:34:13 -03:00
Andrew Mayorov b1a53568d6
test(ds): avoid side effects in check phase 2024-08-05 16:34:17 +02:00
Ivan Dyachkov d6651a1889
Merge pull request #13572 from id/20240805-prep-5.8.0-alpha.1
chore: prepare 5.8.0-alpha.1
2024-08-05 16:05:42 +02:00
Ivan Dyachkov 4cf7151139 chore: prepare 5.8.0-alpha.1 2024-08-05 11:09:07 +02:00
Ivan Dyachkov 4865999606 Merge remote-tracking branch 'upstream/master' into release-58 2024-08-05 10:59:59 +02:00
Andrew Mayorov 382feab7d1
chore(dsraft): fix few spelling errors
Co-Authored-By: Thales Macedo Garitezi <thalesmg@gmail.com>
2024-08-05 10:55:49 +02:00
Andrew Mayorov 6aad774075
chore(dsraft): fix a typespec 2024-08-05 10:55:49 +02:00
Andrew Mayorov 649cbf1c79
fix(dsraft): use local application environment 2024-08-05 10:55:49 +02:00
Andrew Mayorov 4cde5e98a3
chore(dslocal): refine few typespecs 2024-08-05 10:55:48 +02:00
Andrew Mayorov d631b5b296
feat(ds): support deletions + precondition-related API in bitfield-lts 2024-08-05 10:55:48 +02:00
Andrew Mayorov 26ec69d5f4
test(ds): verify deletions work predictably 2024-08-05 10:55:48 +02:00
Andrew Mayorov 58b9ab0210
fix(dsbackend): unify timestamp resolution in operations / preconditions 2024-08-05 10:55:22 +02:00
lafirest 4644072fd8
Merge pull request #13570 from lafirest/fix/api_key_bootstrap
fix(api_key): do not crash boot when the bootstrap file is not exists
2024-08-05 16:33:43 +08:00
Shawn bd87e3ce2b chore: update esockd to 5.12.0 2024-08-05 16:18:04 +08:00
firest c9c4d1a196 fix(api_key): do not crash boot when the bootstrap file is not exists 2024-08-05 15:56:05 +08:00
dependabot[bot] 11546b72f4
chore(deps): bump the actions group across 1 directory with 8 updates
Bumps the actions group with 8 updates in the / directory:

| Package | From | To |
| --- | --- | --- |
| [actions/checkout](https://github.com/actions/checkout) | `4.1.2` | `4.1.7` |
| [actions/upload-artifact](https://github.com/actions/upload-artifact) | `4.3.3` | `4.3.5` |
| [actions/download-artifact](https://github.com/actions/download-artifact) | `4.1.7` | `4.1.8` |
| [docker/setup-qemu-action](https://github.com/docker/setup-qemu-action) | `3.0.0` | `3.2.0` |
| [docker/setup-buildx-action](https://github.com/docker/setup-buildx-action) | `3.3.0` | `3.6.1` |
| [docker/login-action](https://github.com/docker/login-action) | `3.2.0` | `3.3.0` |
| [erlef/setup-beam](https://github.com/erlef/setup-beam) | `1.18.0` | `1.18.1` |
| [ossf/scorecard-action](https://github.com/ossf/scorecard-action) | `2.3.3` | `2.4.0` |



Updates `actions/checkout` from 4.1.2 to 4.1.7
- [Release notes](https://github.com/actions/checkout/releases)
- [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md)
- [Commits](https://github.com/actions/checkout/compare/v4.1.2...692973e3d937129bcbf40652eb9f2f61becf3332)

Updates `actions/upload-artifact` from 4.3.3 to 4.3.5
- [Release notes](https://github.com/actions/upload-artifact/releases)
- [Commits](65462800fd...89ef406dd8)

Updates `actions/download-artifact` from 4.1.7 to 4.1.8
- [Release notes](https://github.com/actions/download-artifact/releases)
- [Commits](65a9edc588...fa0a91b85d)

Updates `docker/setup-qemu-action` from 3.0.0 to 3.2.0
- [Release notes](https://github.com/docker/setup-qemu-action/releases)
- [Commits](68827325e0...49b3bc8e6b)

Updates `docker/setup-buildx-action` from 3.3.0 to 3.6.1
- [Release notes](https://github.com/docker/setup-buildx-action/releases)
- [Commits](d70bba72b1...988b5a0280)

Updates `docker/login-action` from 3.2.0 to 3.3.0
- [Release notes](https://github.com/docker/login-action/releases)
- [Commits](0d4c9c5ea7...9780b0c442)

Updates `erlef/setup-beam` from 1.18.0 to 1.18.1
- [Release notes](https://github.com/erlef/setup-beam/releases)
- [Commits](a6e26b2231...b9c58b0450)

Updates `ossf/scorecard-action` from 2.3.3 to 2.4.0
- [Release notes](https://github.com/ossf/scorecard-action/releases)
- [Changelog](https://github.com/ossf/scorecard-action/blob/main/RELEASE.md)
- [Commits](dc50aa9510...62b2cac7ed)

---
updated-dependencies:
- dependency-name: actions/checkout
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: actions
- dependency-name: actions/upload-artifact
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: actions
- dependency-name: actions/download-artifact
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: actions
- dependency-name: docker/setup-qemu-action
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: actions
- dependency-name: docker/setup-buildx-action
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: actions
- dependency-name: docker/login-action
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: actions
- dependency-name: erlef/setup-beam
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: actions
- dependency-name: ossf/scorecard-action
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: actions
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-08-05 03:25:47 +00:00
dependabot[bot] bcb70a9fb9
chore(deps): bump the actions-package-macos group
Bumps the actions-package-macos group in /.github/actions/package-macos with 1 update: [actions/cache](https://github.com/actions/cache).


Updates `actions/cache` from 4.0.1 to 4.0.2
- [Release notes](https://github.com/actions/cache/releases)
- [Changelog](https://github.com/actions/cache/blob/main/RELEASES.md)
- [Commits](ab5e6d0c87...0c45773b62)

---
updated-dependencies:
- dependency-name: actions/cache
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: actions-package-macos
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-08-05 03:17:26 +00:00
JimMoen 09ec31908b
Merge pull request #13357 from JimMoen/fix-utf8-frame-error-connack
Stop returning `CONNACK` or `DISCONNECT` to clients that sent malformed CONNECT packets.

- Only send `CONNACK` with reason code `frame_too_large` for MQTT-v5.0 when connecting if the protocol version field in CONNECT can be detected.
- Otherwise **DONOT** send any CONNACK or DISCONNECT packet.
2024-08-02 15:24:30 +08:00
lafirest b94ec4014f
Merge pull request #13563 from lafirest/fix/payload_encode
fix(log): respect payload encoding settings when formatting packets
2024-08-02 14:38:12 +08:00
firest 74c346f9d1 fix(log): respect payload encoding settings when formatting packets 2024-08-02 12:41:30 +08:00
zhongwencool 8a33ef8576
Merge pull request #13562 from zhongwencool/fix-deactivate-alarm
fix: deactivate alarm before create resource
2024-08-02 12:08:27 +08:00
zhongwencool 6c2033ecbf fix: deactivate alarm before create resource 2024-08-02 11:03:59 +08:00
zmstone 51530588ef ci: fix a typo in commented out docker-compose yaml file 2024-08-01 22:41:42 +02:00
Thales Macedo Garitezi bba9d085d6 test: refactor test structure 2024-08-01 16:03:04 -03:00
Thales Macedo Garitezi 3162fe7a27 feat: prettify some error explanations 2024-08-01 15:31:00 -03:00
Thales Macedo Garitezi 52b2d73b28 test: move new test to newer module and use current apis 2024-08-01 15:13:25 -03:00
Thales Macedo Garitezi 44e7f2e9b2 refactor: use macros for status to avoid typos 2024-08-01 14:49:43 -03:00
Thales Macedo Garitezi baf2b96cbc test: refactor test structure 2024-08-01 14:27:25 -03:00
Kjell Winblad ba2d4f3df3 docs: add change log entry 2024-08-01 14:21:27 -03:00
Kjell Winblad 11aaa7b07d fix: make MQTT connector error log messages easier to understand
Fixes:
https://emqx.atlassian.net/browse/EMQX-12555
https://emqx.atlassian.net/browse/EMQX-12657
2024-08-01 14:21:26 -03:00
Thales Macedo Garitezi 4250d01363
Merge pull request #13546 from thalesmg/20240730-r58-pulsar-query-mode
feat: expose `resource_opts.query_mode` for pulsar action
2024-08-01 14:19:16 -03:00
Thales Macedo Garitezi 86853ac6ef
Merge pull request #13545 from thalesmg/20240730-m-connector-jwt-app
refactor: move JWT worker and helpers to separate app
2024-08-01 13:06:27 -03:00
Andrew Mayorov 810a4d3cf9
test(dsbackend): add shared tests for atomic batches + preconditions 2024-08-01 14:26:45 +02:00
Andrew Mayorov 7b243ef7ad
feat(ds): support operations + preconditions in skipstream-lts 2024-08-01 14:26:45 +02:00
Andrew Mayorov fcf76d28ba
feat(dsraft): support atomic batches + preconditions 2024-08-01 14:26:45 +02:00
Andrew Mayorov 3b5d98c1d9
feat(ds): adopt buffer interface to `emqx_ds:operation()` 2024-08-01 14:26:45 +02:00
Andrew Mayorov 451b03ff99
feat(ds): add generic preconditions implementation 2024-08-01 14:26:45 +02:00
JimMoen f792418a68
Merge pull request #13552 from JimMoen/fix-plugin-app-takes-too-long
fix: add a startup timeout limit for the plugin application
2024-08-01 16:46:09 +08:00
JimMoen 4915cc0da6
chore: add changelog entry for 13357 2024-08-01 15:23:58 +08:00
JimMoen 15b3f4deb0
fix: rm unused func and exports 2024-08-01 15:00:24 +08:00
JimMoen 7a251c9ead
test: handle frame error for CONNECT packets 2024-08-01 10:26:31 +08:00
JimMoen 37a89d0094
fix: enrich parse_state and connection serialize opts 2024-08-01 10:26:31 +08:00
JimMoen c313aa89f0
fix: try throw proto_ver and proto_name when parsing CONNECT packet 2024-08-01 10:26:31 +08:00
JimMoen 6db1c0a446
refactor: separate function to handle `frame_error` 2024-08-01 10:26:31 +08:00
JimMoen d4508a4f1d
chore: sync master `elvis.config` 2024-08-01 10:26:31 +08:00
Thales Macedo Garitezi a6a9538e73 refactor: move JWT worker and helpers to separate app
Some bridge applications might need to use JWTs before the `emqx_connector` is started, so
we must move JWT table initialization to a separate dependency application.
2024-07-31 14:52:12 -03:00
Thales Macedo Garitezi 9f97bff7d0 feat: expose `resource_opts.query_mode` for pulsar action
Fixes https://emqx.atlassian.net/browse/EMQX-12782
2024-07-31 11:13:11 -03:00
Ivan Dyachkov 577f1a7d8a
Merge pull request #13553 from id/20240731-ci-fix-docker-build
ci: fix docker images build
2024-07-31 16:04:47 +02:00
Ivan Dyachkov e42021d314
Merge pull request #13554 from id/20240731-sync-release-57
sync release-57
2024-07-31 15:48:37 +02:00
Thales Macedo Garitezi 08c58cc319
Merge pull request #13543 from thalesmg/20240730-r57-sr-delete-protobuf-cache
fix(schema registry): clear protobuf code cache when deleting/updating serdes
2024-07-31 10:16:48 -03:00
Thales Macedo Garitezi 150fee87f1
Merge pull request #13541 from thalesmg/20240730-r57-unset-crl-check-listener
fix(crl): force remove CRL fields from SSL opts after listener update
2024-07-31 10:16:35 -03:00
ieQu1 6058b50c91
Merge pull request #13555 from ieQu1/ds-rest-404
fix(mgmt): Return 404 for /ds/ API endpoints when DS is disabled
2024-07-31 14:57:17 +02:00
Thales Macedo Garitezi 85cff5e7eb fix: merge conflicts 2024-07-31 09:14:29 -03:00
ieQu1 569f48f5a1
fix(mgmt): Return 404 for /ds/ API endpoints when DS is disabled 2024-07-31 13:44:38 +02:00
ieQu1 2cf86e76ee
Merge pull request #13551 from ieQu1/EMQX-12587
fix(sessds): Expose durable sessions in the config API
2024-07-31 12:00:26 +02:00
Ivan Dyachkov 74cef7937d Merge remote-tracking branch 'upstream/release-57' into 20240731-sync-release-57 2024-07-31 11:31:29 +02:00
JimMoen c658cfe269
fix: make static_check happy 2024-07-31 17:17:13 +08:00
JimMoen a246551914
fix: add a startup timeout limit for the plugin application 2024-07-31 17:17:11 +08:00
JimMoen b1c8bc2421
Merge pull request #13548 from JimMoen/feat-plugin-on-config-changed-callback
feat: call plugin's app module `on_config_changed/2` callback
2024-07-31 16:40:48 +08:00
ieQu1 200b5ab294
Merge pull request #13550 from ieQu1/no-ra-dependency-on-oss
chore(emqx): Remove ra from the list of EMQX dependencies
2024-07-31 10:30:44 +02:00
Ivan Dyachkov 8d8ff6cf5d ci: fix docker images build
/etc/docker/daemon.json requires root for read access
2024-07-31 10:27:04 +02:00
ieQu1 a23b8266b1
fix(sessds): Expose durable sessions in the config API 2024-07-31 10:18:38 +02:00
ieQu1 d69342a2fc
chore(emqx): Remove ra from the list of EMQX dependencies 2024-07-31 09:56:28 +02:00
JimMoen e6bfc14cc9
fix: try-catch optional `on_config_changed/2` plugin app callback 2024-07-31 10:09:02 +08:00
JimMoen 3d1f0c756c
feat: call plugin's app module `on_config_changed/2` callback
assume the module: `[PluginName]_app`
2024-07-31 10:09:02 +08:00
Thales Macedo Garitezi 83041a8b83
Merge pull request #13544 from thalesmg/20240730-m-test-flaky-client-v2
test(clients v2 api): attempt to reduce flakiness
2024-07-30 16:07:37 -03:00
Thales Macedo Garitezi 1c4402b12c test(clients v2 api): attempt to reduce flakiness
https://github.com/emqx/emqx/actions/runs/10161391242/job/28101183920#step:6:331
2024-07-30 14:07:08 -03:00
Thales Macedo Garitezi ebb69f4ebf fix(crl): force remove crl fields from SSL opts after listener update
Fixes https://emqx.atlassian.net/browse/EMQX-12785
2024-07-30 14:00:24 -03:00
Thales Macedo Garitezi fd961f9da7 fix(schema registry): clear protobuf code cache when deleting/updating serde
Fixes https://emqx.atlassian.net/browse/EMQX-12789
2024-07-30 13:52:34 -03:00
Ilia Averianov 359bc38aa4
Merge pull request #13407 from savonarola/0701-shared-sub
Implement shared subscriptions
2024-07-30 16:12:13 +03:00
Ilya Averyanov 08f70e4a25 feat(queue): move ds shared sub dependent test to emqx_ds_shared_sub app 2024-07-30 14:19:39 +03:00
Ilya Averyanov e408804efb feat(queue): fix dialyzer issues 2024-07-30 13:01:48 +03:00
Ilya Averyanov e294d35703 feat(queue): add schema descriptions 2024-07-30 13:01:48 +03:00
Ilya Averyanov 303ff95e10 feat(queue): add stub for CRUD API 2024-07-30 13:01:48 +03:00
Ilya Averyanov 23f0e88b45 feat(queue): add integration with external broker 2024-07-30 13:01:46 +03:00
Ilya Averyanov f0dd1bc4f4 feat(queue): add shared sub support to the management API 2024-07-30 13:01:20 +03:00
Ilya Averyanov 9b30320ddb feat(queue): simplify progress report on disconnect 2024-07-30 13:01:20 +03:00
Ilya Averyanov cae27293a5 feat(queue): move route registration to sessions 2024-07-30 13:01:19 +03:00
Ilya Averyanov 81f4103d60 feat(queue): avoid cyclic dependencies 2024-07-30 13:01:19 +03:00
Ilya Averyanov bab526be24 feat(queue): self-revoke all shared streams on session open 2024-07-30 13:01:19 +03:00
Ilya Averyanov 9307a82004 feat(queue): rearrange leader's code 2024-07-30 13:01:19 +03:00
Ilya Averyanov b8e8f7c8e0 feat(queue): add pre_renew_streams callback 2024-07-30 13:01:18 +03:00
Ilya Averyanov a97a0d6400 feat(queue): fix dialyzer issues 2024-07-30 13:01:18 +03:00
Ilya Averyanov 8705956cdc feat(queue): update docs 2024-07-30 13:01:18 +03:00
Ilya Averyanov f213569460 feat(queue): clarify naming; identify shared subs by full topic filter 2024-07-30 13:01:18 +03:00
Ilya Averyanov 7e23f8d19f feat(queue): fix include 2024-07-30 13:01:17 +03:00
Ilya Averyanov a676ede6b8 feat(queue): improve logging 2024-07-30 13:01:17 +03:00
Ilya Averyanov 9e5e7a23c5 feat(queue): remove unnecessary acked flag 2024-07-30 13:01:17 +03:00
Ilya Averyanov 143086b0ef feat(queue): replace invalid rewing algorithm with skipping iterator 2024-07-30 13:01:16 +03:00
Ilya Averyanov c569625dd1 feat(queue): handle partially unacked ranges 2024-07-30 13:01:16 +03:00
Ilya Averyanov 7daab1ab23 feat(queue): move replay progress to a separate data structure 2024-07-30 13:01:16 +03:00
Ilya Averyanov 077ee38530 feat(queue): add config 2024-07-30 13:01:15 +03:00
Ilya Averyanov b74189570d feat(queue): do not use ee app from emqx app 2024-07-30 13:01:15 +03:00
Ilya Averyanov 649cf88042 feat(queue): kick agents that do not return to the replaying state for long 2024-07-30 13:01:15 +03:00
Ilya Averyanov 1496f7f778 feat(queue): add leader_rank_progress test 2024-07-30 13:01:15 +03:00
Ilya Averyanov 91dd1183ad feat(queue): fix dialyzer issues 2024-07-30 13:01:14 +03:00
Ilya Averyanov 65ab81ff74 feat(queue): fix quick resubscription 2024-07-30 13:01:14 +03:00
Ilya Averyanov 53d4cd3174 feat(queue): rename leader' stream_progresses to stream_states 2024-07-30 13:01:14 +03:00
Ilya Averyanov 7d004b37da feat(queue): implement stream finalization 2024-07-30 13:01:13 +03:00
Ilya Averyanov e5547005eb feat(queue): implement resubscribe test 2024-07-30 13:01:13 +03:00
Ilya Averyanov fada2a3fea feat(queue): reorganize and document shared subs module 2024-07-30 13:01:13 +03:00
Ilya Averyanov b4a010d63b feat(queue): implement unsubscribe 2024-07-30 13:01:13 +03:00
Ilya Averyanov 9bde981c44 feat(queue): fix static check issues 2024-07-30 13:01:12 +03:00
Ilya Averyanov 7658e081c5 feat(queue): move design docs to the EIP 2024-07-30 13:01:12 +03:00
Ilya Averyanov 8dce530d15 feat(queue): fix progress reporting and more tests
We test reassignment during the intensive replay
2024-07-30 13:01:12 +03:00
Ilya Averyanov a20d262327 feat(queue): send progress before fetching new messages 2024-07-30 13:01:11 +03:00
Ilya Averyanov d32f282feb feat(queue): add graceful disconnect 2024-07-30 13:01:11 +03:00
Ilya Averyanov 1d728a05b2 feat(queue): send metadata with agent when connecting to leader
It will be used to attach agent taints to improve stream assignment.
2024-07-30 13:01:11 +03:00
Ilya Averyanov 49bff5c08a feat(queue): wrap remote calls in a proto 2024-07-30 13:01:10 +03:00
Ilya Averyanov 61eda0ff31 feat(queue): identify agents by SessionId in tests 2024-07-30 13:01:10 +03:00
Ilya Averyanov 8f0d807c00 feat(queue): add new test scenarios 2024-07-30 13:01:10 +03:00
Ilya Averyanov bceb5d43ed feat(queue): fix stream rebalancing issues, update tests 2024-07-30 13:01:10 +03:00
Ilya Averyanov 03fea34962 feat(queue): document protocol between agent and leader
Document leader's states
2024-07-30 13:01:09 +03:00
Ilya Averyanov 082514f557 feat(queue): implement full protocol between agent and leader 2024-07-30 13:01:09 +03:00
Ilya Averyanov c831f0772f feat(queue): handle renew_lease_timeout 2024-07-30 13:01:09 +03:00
Ivan Dyachkov ca455ad992
Merge pull request #13532 from emqx/sync-release-57-20240729-021938
Sync release-57
2024-07-30 10:49:09 +02:00
JianBo He c347c2c285
Merge pull request #13540 from zmstone/0729-rule-funciton-getenv-should-be-limited-to-vars-with-prefix-EMQXVAR_
refactor: force getenv to access only OS env with prefix EMQXVAR_
2024-07-30 08:23:24 +08:00
zmstone a49cd78aae refactor: force getenv to access only OS env with prefix EMQXVAR_ 2024-07-29 23:54:00 +02:00
zmstone 4065158be7
Merge pull request #13534 from JimMoen/feat-add-superuser-skip-authz
feat: add authz skipped trace for superuser
2024-07-29 22:30:13 +02:00
ieQu1 18721d05bc
Merge pull request #13526 from ieQu1/replicant-ee
fix(mria): Reserve replicant role for EE only
2024-07-29 22:10:06 +02:00
zmstone 7f7d0741d2
Merge pull request #13528 from zmstone/0726-unrecoverable-error-limit
0726 unrecoverable error limit
2024-07-29 21:32:14 +02:00
zmstone 2e39c4ad5e
Merge pull request #13495 from thalesmg/20240719-m-hide-enable-fields-mkII
chore: hide enable flags from schema and config examples
2024-07-29 21:31:20 +02:00
zmstone 5b50d5433a
Merge pull request #13537 from thalesmg/20240729-r57-auto-decode-payload-kprodu
feat: attempt to automatically decode `payload` similar to key and message templates
2024-07-29 21:03:41 +02:00
zmstone eab440e0c1 docs: add changelog for PR 13528 2024-07-29 20:41:57 +02:00
zmstone e08425e67d refactor(log-throttler): remove unnecessary code
there is no need to reset counters before erasing
2024-07-29 20:41:27 +02:00
zhongwencool f6f1d32da0 feat: throttle with resource_id 2024-07-29 20:41:27 +02:00
zhongwencool 2924ec582a feat: add unrecoverable_resource_error throttle 2024-07-29 20:41:27 +02:00
zhongwencool 8dc1d1424a chore: add resource tag for log 2024-07-29 20:41:27 +02:00
Thales Macedo Garitezi 693d5dd394 feat: attempt to automatically decode `payload` similar to key and message templates 2024-07-29 13:01:06 -03:00
ieQu1 f85db0a0e9
fix: Apply suggestions from code review
Co-authored-by: Thales Macedo Garitezi <thalesmg@gmail.com>
2024-07-29 17:22:41 +02:00
lafirest 60aefd1065
Merge pull request #13520 from lafirest/feat/scram-rest-acl
feat(scram): supports ACL rules in `scram_restapi` backend
2024-07-29 22:46:50 +08:00
JianBo He c637422302
Merge pull request #13518 from thalesmg/20240724-r57-dynamic-kprodu-action-mkIII
feat(kafka producer): allow dynamic topics (mkIII)
2024-07-29 22:43:20 +08:00
Thales Macedo Garitezi e80d43d14d test(fix): use ebin path without plugins
Without the filtering that already exists in cth:ebin_path, the rebar3 plugins path may
take priority over normal dependencies.  Since we just updated hocon, and there seems to
be an older hocon among the rebar3 plugins, it started to break the test because older
hocon was getting loaded in the peer.
2024-07-29 09:50:25 -03:00
Thales Macedo Garitezi b3074144cc chore: temporarily revert `NO_DOC` changes to fields with default value = false
These will be dealt with in follow up PRs, by allowing the parent struct to be set to a
special `disabled` value in such cases.
2024-07-29 09:50:25 -03:00
Thales Macedo Garitezi 6786c9b517
refactor: improve descriptions and identifiers
Co-authored-by: zmstone <zmstone@gmail.com>
2024-07-29 09:45:52 -03:00
Thales Macedo Garitezi 8913de10c0
Merge pull request #13527 from thalesmg/20240726-r57-multiple-froms-sql-test
fix(rule engine tester): fix message publish with bridge source in from clause
2024-07-29 09:37:17 -03:00
JimMoen 5ddd7d7a6a
test: superuser skipped all authz check 2024-07-29 17:27:36 +08:00
JimMoen d7cac74bed
feat: add authz skipped trace 2024-07-29 17:27:36 +08:00
JianBo He 0b0a28ae44
chore: update changes/ee/feat-13504.en.md
Co-authored-by: zmstone <zmstone@gmail.com>
2024-07-29 10:45:24 +08:00
id c1e2801f41 Merge remote-tracking branch 'origin/release-57' into sync-release-57-20240729-021938 2024-07-29 02:19:38 +00:00
ieQu1 8036baf22c
test(paho): Run RLOG paho test with replicants only on EE 2024-07-26 21:53:32 +02:00
ieQu1 268f887700
test(mgmt): Disable certain tests on OSS 2024-07-26 20:24:53 +02:00
Thales Macedo Garitezi 1d56ac6e5e refactor: change topic schema type 2024-07-26 14:26:21 -03:00
Thales Macedo Garitezi 4e0742c66f feat: make kafka producer freely dynamic 2024-07-26 14:25:20 -03:00
ieQu1 8c1302f455
test(conf_app): Remove redundand config 2024-07-26 17:25:09 +02:00
ieQu1 b8a2a8ea18
test(router): Skip certain tests on OSS 2024-07-26 17:25:09 +02:00
ieQu1 b7c424a13d
test(persmsg): Remove redundant config 2024-07-26 17:17:06 +02:00
ieQu1 1b6494ab9a
test(mgmt): Remove redundant config 2024-07-26 17:17:06 +02:00
ieQu1 41bf5cd6ca
test(otel): Remove redundant config 2024-07-26 17:17:06 +02:00
ieQu1 548bcceab7
test(auth): Remove redundant config 2024-07-26 17:17:06 +02:00
ieQu1 1beda1cd11
test(mria): Remove role from the example config 2024-07-26 17:17:06 +02:00
ieQu1 9da744c423
fix(mria): Reserve replicant role for EE only 2024-07-26 17:17:06 +02:00
lafirest b2f2af6871
Merge pull request #13523 from lafirest/fix/oidc
fix(oidc): fixed update and callback errors for OIDC
2024-07-26 21:09:11 +08:00
Thales Macedo Garitezi 3fae704903 fix(rule engine tester): fix message publish with bridge source in from clause
Fixes https://emqx.atlassian.net/browse/EMQX-12762
2024-07-26 09:27:16 -03:00
lafirest 2d6b2bff8e
Merge pull request #13524 from lafirest/feat/exclusive-cli
feat(exclusive): added CLI interface for exclusive topics
2024-07-26 19:54:03 +08:00
firest dc342a35ac chore: update changes 2024-07-26 17:18:52 +08:00
firest 397c104a85 feat(exclusive): added CLI interface for exclusive topics 2024-07-26 16:56:47 +08:00
firest 49b24a3049 fix(oidc): fixed update and callback errors for OIDC 2024-07-26 15:41:22 +08:00
firest 7bf70aaab6 feat(scram): supports ACL rules in `scram_restapi` backend 2024-07-26 14:30:28 +08:00
zmstone 9a5d50f26a
Merge pull request #13521 from zmstone/0725-add-ldap-reconnect-on-timeout
0725 add ldap reconnect on timeout
2024-07-26 08:29:02 +02:00
Thales Macedo Garitezi df1f4fad70 feat(kafka producer): allow dynamic topics from pre-configured topics
Fixes https://emqx.atlassian.net/browse/EMQX-12656
2024-07-25 15:33:12 -03:00
Thales Macedo Garitezi 39b8cb1789
Merge pull request #13487 from thalesmg/20240715-m-refactor-cluster-link-api
feat(cluster link): refactor http api, add status and metrics
2024-07-25 14:51:36 -03:00
Thales Macedo Garitezi 33eccb35da chore: update wolff -> 3.0.2 2024-07-25 14:30:50 -03:00
zmstone f6a0f56771 docs: add changelog for PR 13521 2024-07-25 19:24:52 +02:00
zmstone 7631420eef test: add test case to cover ldap search timeout 2024-07-25 18:43:37 +02:00
zmstone 8f94e9684c fix: handle ldap seqrch error 2024-07-25 18:42:09 +02:00
zmstone 43f799508a chore: add ldap test doc 2024-07-25 18:42:08 +02:00
Thales Macedo Garitezi 03821c7b49 fix(cluster link metrics): route count metric is cluster-wide 2024-07-25 13:12:08 -03:00
Thales Macedo Garitezi 6da71200f3 refactor: improve bookkeeping api 2024-07-25 13:12:08 -03:00
Thales Macedo Garitezi 6dbf015c93 refactor: demote hidden config to hardcoded value 2024-07-25 13:12:08 -03:00
Thales Macedo Garitezi 30259284d1 chore: namespace metrics by type 2024-07-25 13:12:08 -03:00
Thales Macedo Garitezi 87e4e2340d refactor: better metric and error fold 2024-07-25 13:12:08 -03:00
lafirest 1925ed2f55
Merge pull request #13507 from lafirest/feat/env_func
feat(variform): add a builtin function to get env vars
2024-07-25 22:46:50 +08:00
lafirest a45f817f0e
Merge pull request #13515 from lafirest/fix/exclusive
fix(exclusive): allow the same client to resubscribe to an existing exclusive topic
2024-07-25 21:26:08 +08:00
firest 57959ac7d4 chore: update changes 2024-07-25 18:59:40 +08:00
firest 79020b2436 feat(variform): add a builtin function to get env vars 2024-07-25 18:50:53 +08:00
firest 141d8144e4 fix(scram): change the name from `scram_http` to `scram_restapi` 2024-07-25 17:01:49 +08:00
firest 4f21594707 chore: update changes 2024-07-25 09:40:20 +08:00
firest 117c8197d7 fix(exclusive): allow the same client to resubscribe to an existing exclusive topic 2024-07-25 09:40:15 +08:00
Thales Macedo Garitezi b283a8c1ff
Merge pull request #13505 from thalesmg/20240722-m-rule-conn-deps-part-2
feat(rule engine api): add filters options for action and source ids
2024-07-24 16:52:47 -03:00
Thales Macedo Garitezi c728b98e79
Merge pull request #13510 from thalesmg/20240723-r57-fix-jwt-about-to-expire-check
fix(jwt): fix grace period for renewal check
2024-07-24 16:52:35 -03:00
Thales Macedo Garitezi dda73651c5 fix(cluster link metrics): use periodic full table scan and gauge to count routes 2024-07-24 16:46:04 -03:00
Ivan Dyachkov c31e28153f
Merge pull request #13513 from id/20240724-sync-release-57
sync release-57
2024-07-24 20:12:06 +02:00
Thales Macedo Garitezi 7829838dc5 feat(cluster link api): add forwarding resource metrics to response 2024-07-24 14:53:57 -03:00
Thales Macedo Garitezi 80e035f115 feat(rule engine api): add filters options for action and source ids
Fixes https://emqx.atlassian.net/browse/EMQX-12654 (requirement 2)
2024-07-24 13:32:50 -03:00
Thales Macedo Garitezi 34f5a886ce refactor(cluster link api): return erpc errors in status and metrics responses 2024-07-24 12:07:34 -03:00
Thales Macedo Garitezi 79db2e6d7f test: fix flaky test 2024-07-24 11:17:00 -03:00
Thales Macedo Garitezi 3e4eeddb78 fix: add missing `resource_type` callback implementations 2024-07-24 10:53:33 -03:00
Thales Macedo Garitezi d2da311416 fix(resource): create undocumented callback
Created by https://github.com/emqx/emqx/pull/13449 but not added as a callback.
2024-07-24 10:53:33 -03:00
Thales Macedo Garitezi 76e51fa532 fix: correctly use maybe match clause 2024-07-24 10:17:45 -03:00
Thales Macedo Garitezi 82bb876de0
docs: improve descriptions
Co-authored-by: Andrew Mayorov <encube.ul@gmail.com>
2024-07-24 10:15:01 -03:00
Thales Macedo Garitezi 2d507146ab refactor: change style of case clause 2024-07-24 10:13:48 -03:00
Thales Macedo Garitezi 216a6abed9 refactor: rename CRUD functions 2024-07-24 10:11:03 -03:00
Thales Macedo Garitezi ca2d4ad2a0 refactor: move metrics logic to separate module 2024-07-24 10:04:27 -03:00
Thales Macedo Garitezi 311419f621
Merge pull request #13489 from thalesmg/20240718-m-init-debug
feat(bin/emqx): add `-init_debug` system arg when `DEBUG=2`
2024-07-24 09:16:12 -03:00
Thales Macedo Garitezi 9a950571d8
Merge pull request #13492 from thalesmg/20240718-m-rules-conn-deps
feat: return dependent entities in connectors/actions/sources API
2024-07-24 09:16:00 -03:00
Thales Macedo Garitezi 9e65e0d048
Merge pull request #13503 from thalesmg/20240722-r57-resource-manager-hc-interval-startup
fix(connector resource): use configuration `resource_opts` for health check interval when starting up
2024-07-24 09:15:47 -03:00
ieQu1 d1edf8aad2
Merge pull request #13514 from ieQu1/skip-streams-improvement
fix(ds): Improve logic of skipstream LTS layout
2024-07-24 13:28:44 +02:00
ieQu1 b010efb647
fix(ds): Improve logic of skipstream LTS layout
Iterators:
Previously it used timestamp of the next message as a reference. This
won't work well for the upcoming beamformer/beamsplitter feature. This
commit changes the logic so iterators store timestamp of the last seen
message.

Cooked batches:
Cooked batches no longer store index entries. Creation of indexes has
been delegated to commit callback.
2024-07-24 10:32:06 +02:00
Ivan Dyachkov 606d829256 Merge remote-tracking branch 'upstream/release-57' into 20240724-sync-release-57 2024-07-24 10:28:00 +02:00
zhongwencool c7a7658c7a
Merge pull request #13449 from zhongwencool/resource-log
feat: add group/type to resource slog
2024-07-24 14:34:25 +08:00
zhongwencool 4d7535df2d chore: use pgsqsl replace postgresql 2024-07-24 13:49:31 +08:00
lafirest 8a344a8646
Merge pull request #13504 from lafirest/feat/scram-http
feat(authn): added a HTTP backend for the authentication mechanism scram
2024-07-24 10:28:57 +08:00
Thales Macedo Garitezi 7374123c5c fix(jwt): fix grace period for renewal check 2024-07-23 17:25:29 -03:00
Thales Macedo Garitezi 9c0f1df8a3
Merge pull request #13506 from thalesmg/20240722-m-peername-sys-events
feat: add `peername` to rule events that already have `peerhost`
2024-07-23 09:38:57 -03:00
zhongwencool e7d07ea17c feat: add resource_type to emqx_resource behaviour 2024-07-23 18:24:28 +08:00
firest 7bf270a242 chore: update changes 2024-07-23 16:08:03 +08:00
firest 878b218692 feat(authn): added a HTTP backend for the authentication mechanism scram 2024-07-23 16:07:32 +08:00
zhongwencool e74a921d33 chore: compile error 2024-07-23 15:14:42 +08:00
zhongwencool 2a58a36e37 chore: add resource tag for log 2024-07-23 15:14:42 +08:00
zhongwencool 2bb062d3a3 chore: create_local/5 for emqx_resource_proto_v1 2024-07-23 15:14:42 +08:00
zhongwencool f29988ed8e chore: add tag to resouce log 2024-07-23 15:14:42 +08:00
zhongwencool e148d903e8 feat: log resource_id 2024-07-23 15:14:42 +08:00
zhongwencool 0a04b1ad6e feat: add group/type to resource slog 2024-07-23 15:14:41 +08:00
zhongwencool cba3f532f8 feat: don't record dry_run log 2024-07-23 15:14:41 +08:00
Xinyu Liu 7bb7b10a31
Merge pull request #13114 from emqx/emqx-relup-gen
feat: generate relup tarball, add relup APIs
2024-07-23 15:00:52 +08:00
Shawn 439abe430b refactor: remove relup revert callback functions 2024-07-23 11:45:55 +08:00
Shawn eb71477f43 chore: move relup_info to rel/relup 2024-07-23 09:32:54 +08:00
Thales Macedo Garitezi 99e6613713 test(rule events): add test cases for `schema.validation_failed` and `message.transformation_failed` events 2024-07-22 16:31:48 -03:00
Thales Macedo Garitezi d9832252d8 refactor: add namespace to avoid clashes with operations or other resources 2024-07-22 16:04:19 -03:00
Thales Macedo Garitezi 6a5849488c feat(cluster link): add metrics
Fixes https://emqx.atlassian.net/browse/EMQX-12627
2024-07-22 16:04:19 -03:00
Thales Macedo Garitezi 07cb147d38 fix(cluster link schema): username is not required 2024-07-22 16:04:19 -03:00
Thales Macedo Garitezi ba3cbe02e3 feat(cluster link api): add status to responses
Fixes https://emqx.atlassian.net/browse/EMQX-12627
2024-07-22 16:04:19 -03:00
Thales Macedo Garitezi 0b1f0db73c chore(cluster link): refactor HTTP API for CRUD operations
Fixes https://emqx.atlassian.net/browse/EMQX-12627
2024-07-22 16:04:19 -03:00
Thales Macedo Garitezi 7ca5205f3f feat: add `peername` to rule events that already have `peerhost`
Fixes https://emqx.atlassian.net/browse/EMQX-12342
2024-07-22 16:01:30 -03:00
Ivan Dyachkov d1c218303d
Merge pull request #13498 from emqx/sync-release-57-20240722-022026
Sync release-57
2024-07-22 19:30:10 +02:00
Thales Macedo Garitezi d7112921a6 docs: remove `enable` from config examples
Fixes https://emqx.atlassian.net/browse/EMQX-12730
2024-07-22 13:26:53 -03:00
Thales Macedo Garitezi 69f5b6fa6c chore: hide `enable` fields from docgen
Fixes https://emqx.atlassian.net/browse/EMQX-12730
2024-07-22 13:26:53 -03:00
Thales Macedo Garitezi 8ae54ac325 fix(connector resource): use configuration `resource_opts` for health check interval when starting up
Fixes https://emqx.atlassian.net/browse/EMQX-12738
2024-07-22 11:34:10 -03:00
Thales Macedo Garitezi 220fbe8a0a test: fix flaky test 2024-07-22 09:44:51 -03:00
zhongwencool a2bed1efb8
Merge pull request #13480 from zhongwencool/rule-engine-log-tag
feat: Rule engine log tag
2024-07-22 16:47:13 +08:00
zhongwencool 57b67ebb37
Merge pull request #13481 from zhongwencool/match_rule_error
chore: add authz tag to match_rule_error log
2024-07-22 16:46:49 +08:00
Shawn 862336a2cb feat: hide relup plugins from APIs and CLIs 2024-07-22 16:07:50 +08:00
id ed2fab51e9 Merge remote-tracking branch 'origin/release-57' into sync-release-57-20240722-022026 2024-07-22 02:20:27 +00:00
Thales Macedo Garitezi 65544f34ec chore: bump hocon -> 0.43.2 2024-07-19 17:25:18 -03:00
Thales Macedo Garitezi 8d535bbd24
Merge pull request #13464 from thalesmg/20240712-m-res-manager-shutdown-logs
chore: attempt to reduce race condition supervisor noproc shutdown error logs
2024-07-19 14:57:56 -03:00
Thales Macedo Garitezi d7e72808a8 docs: add changelog 2024-07-19 14:43:55 -03:00
Thales Macedo Garitezi 4d174b8678 feat(sources & actions api): add dependent rules to response
Fixes https://emqx.atlassian.net/browse/EMQX-12654
2024-07-19 14:43:55 -03:00
Thales Macedo Garitezi b5231c29e3 feat(bin/emqx): add `-init_debug` system arg when `DEBUG=2` 2024-07-19 12:32:37 -03:00
Thales Macedo Garitezi eb2d3a3b7e chore: attempt to reduce race condition supervisor shutdown errors
Fixes https://emqx.atlassian.net/browse/EMQX-12442

e.g.:
```
2024-05-23T08:52:39.811845+00:00 [error] Supervisor: {local,emqx_resource_manager_sup}. Context: shutdown_error. Reason: noproc. Offender: id=<<99, 101, 110, 115, 111, 114, 101, 100>>,pid=<0.7752.1030>.
```

It could be just a race condition, as it seems to be the case for resource manager: i) a call is made to the process to stop it; ii) the call times out; iii) the after clause ends up calling supervisor:terminate_child; iv) while the supervisor is finding the child to terminate, the process actually finishes terminating, and the supervisor receives a noproc reason back.
2024-07-19 10:57:00 -03:00
Thales Macedo Garitezi ae828e8cfb feat(connectors api): add dependent actions and sources to response
Fixes https://emqx.atlassian.net/browse/EMQX-12654
2024-07-19 10:33:48 -03:00
Thales Macedo Garitezi 464e202742
Merge pull request #13488 from thalesmg/20240718-m-mix-fix-machine-dep
chore(new mix build): fix app dependency for release
2024-07-19 09:21:02 -03:00
Andrew Mayorov b7200656a5
Merge pull request #13486 from keynslug/fix/ci/ds-raft-flaky-next
test(dsraft): attempt to stabilize flaky testcases
2024-07-19 12:15:36 +02:00
Shawn fc3405fe4c fix: bp_api for relup 2024-07-19 17:35:36 +08:00
Shawn f11dfce292 ci: suppress dialyzer checks for quicer and odbc types 2024-07-19 17:11:49 +08:00
Shawn c61828460a chore: emqx_utils_api:with_node/2 support simple http-code 2024-07-19 15:02:14 +08:00
Shawn 4d25f28bb2 fix: dialyzer checks 2024-07-19 14:19:50 +08:00
Shawn 5c2a7dfdfa fix: rename relup dir to relup_info to avoid tgz failure 2024-07-19 12:17:12 +08:00
Shawn 3ad7dc262b fix: some sanity-checks 2024-07-19 11:38:44 +08:00
Shawn 3c8ef35b18 fix: show relup status even if no packages installed 2024-07-19 11:20:37 +08:00
Thales Macedo Garitezi 01883e9759 chore(new mix build): fix app dependency for release 2024-07-18 11:33:02 -03:00
Thales Macedo Garitezi ca47e4768d
Merge pull request #13485 from thalesmg/20240717-r57-rm-dead-code-purge-proto
chore: remove dead code
2024-07-18 09:22:27 -03:00
Shawn 79b65a28c1 chore: use emqx-relup 0.1.0 2024-07-18 18:54:10 +08:00
Andrew Mayorov 3a893626b8
Merge pull request #13474 from keynslug/ft/EMQX-12309/ds-cas-api
feat(ds): allow isolated batches with preconditions
2024-07-18 12:52:13 +02:00
Shawn 2008130071 feat: add HTTP APIs for relup 2024-07-18 18:48:38 +08:00
Shawn c6b02bc13f feat: support starting emqx from relup dir
We put all of the unpacked files into `relup` dir, and warn the user if boot from it
2024-07-18 18:47:27 +08:00
Ivan Dyachkov 4a04ffdca1
Merge pull request #13483 from id/20240717-sync-release-57
sync release-57
2024-07-18 10:41:49 +02:00
Ivan Dyachkov c2d49ff34f chore(rmq_tests): fix rabbitmq tests
Co-authored-by: Ilya Averyanov <av@rubybox.dev>
2024-07-18 08:34:00 +02:00
zhongwencool ac52bf39ce
Merge pull request #13443 from zhongwencool/cluster-link-cli-load
fix: update cluster.links via cli
2024-07-18 09:13:59 +08:00
Andrew Mayorov 0e545ffcec
feat(ds): add dedicated `#message_matcher{}` for preconditions 2024-07-17 21:27:17 +02:00
Andrew Mayorov 2e89656a90
test(dsraft): start `t_replication_transfers_snapshots` from stable state 2024-07-17 20:01:55 +02:00
Andrew Mayorov 466fa41ec3
fix(dsraft): rely on last resort timeout with unresponsive replicas
This simplifies the shard transition scheduling logic and makes it less
prone to races.
2024-07-17 19:24:38 +02:00
Thales Macedo Garitezi 93c725732c chore: remove dead code 2024-07-17 12:56:45 -03:00
Thales Macedo Garitezi 4edbcc55e7
Merge pull request #13463 from thalesmg/20240712-m-gprodu-backoff-retry
feat(gcp pubsub producer): retry on 502 and 503 http status code responses
2024-07-17 12:49:27 -03:00
Thales Macedo Garitezi cd8bf2725a
Merge pull request #13453 from thalesmg/20240711-r57-mt-fixes
batch of message transformation fixes
2024-07-17 12:45:19 -03:00
Andrew Mayorov 0c05b3f019
fix(ds): make conditionals less confusing 2024-07-17 16:23:41 +02:00
Andrew Mayorov 78fe9304be
Merge pull request #13462 from keynslug/fix/ci/flaky-ds-raft
fix(dsraft): preserve pending replica set transitions
2024-07-17 16:11:59 +02:00
Andrew Mayorov 14022aded1
feat(ds): allow isolated batches with preconditions
Namely, single message deletions and preconditions that can be used to
build complex "compare-and-swap"-style operations. Also allow user to
declare that atomic batches support is needed for a DB.
2024-07-17 15:57:17 +02:00
Andrew Mayorov 02e1007a16
feat(dslocal): implement `force_monotonic_timestamps => false` 2024-07-17 15:49:50 +02:00
zhongwencool 3381eecd6f chore: apply code review 2024-07-17 21:27:07 +08:00
zhongwencool 3c832db13d test: test cluster.links reloaded 2024-07-17 21:06:48 +08:00
zhongwencool 937fb153c2 fix: fill_default/3 should populate default values for all parameters except the 'ds' 2024-07-17 21:06:48 +08:00
zhongwencool 7b6b9580c8 test: add test for updating cluster.links 2024-07-17 21:06:48 +08:00
zhongwencool 2783192f77 fix: update cluster.links via cli 2024-07-17 21:06:48 +08:00
zhongwencool 083537daa3 fix: retry not_found if conf file not exist 2024-07-17 21:06:48 +08:00
Andrew Mayorov ae3812da85
feat(ds): allow to turn monotonic timestamps off for DB
That tells implementation how to assign timestamps to messages. Current
implicit default is now `force_monotonic_timestamps => true`.
2024-07-17 14:40:23 +02:00
zmstone 4c51cfdb68
Merge pull request #13445 from tigercl/docs/improve-desc
docs: improve the desc of configuration items
2024-07-17 14:37:12 +02:00
Andrew Mayorov 6b130c6422
fix(dsraft): preserve pending replica set transitions
Otherwise, information about pending replica set transitions taking a
long time to complete could be lost on subsequent target set changes and
node crashes.
2024-07-17 12:17:07 +02:00
zhouzb 67880ab6a0 docs: improve the desc of configuration items
docs: fix typos

docs: better line breaks

docs: improve desc

docs: fix typo

docs: update desc for shared_subscription_strategy_enum

docs: fix typo

docs: update desc

docs: fix typo
2024-07-17 17:39:36 +08:00
Ivan Dyachkov 292b331064 Merge remote-tracking branch 'upstream/release-57' into 20240717-sync-release-57 2024-07-17 11:29:25 +02:00
zhongwencool 52031441cf chore: add authz tag to match_rule_error log 2024-07-17 12:21:32 +08:00
zhongwencool 604cff4887 feat: add rule tag to rule_engine log 2024-07-17 12:15:57 +08:00
lafirest 6697035812
Merge pull request #13441 from lafirest/feat/coap
feat(coap): use content-sensitive udp proxy for coap
2024-07-17 10:01:51 +08:00
Thales Macedo Garitezi 1ad02a11e2
Merge pull request #13455 from thalesmg/20240711-m-mix-umbrella-part-III-no-ci
sync new mix build work to master
2024-07-16 14:41:10 -03:00
Andrew Mayorov d04915d6a6
test(dsraft): increase `ra_server` logging level for debugging 2024-07-16 15:54:49 +02:00
Andrew Mayorov 78bb102311
test(dsraft): attempt to start select testcases from stable state 2024-07-16 15:54:49 +02:00
ieQu1 706cab3c86
Merge pull request #13467 from ieQu1/dev/optimize-connection-process_msg
fix(connection): Make process_msg function tail-recursive
2024-07-15 17:00:50 +02:00
Thales Macedo Garitezi 4a08bfc93f feat(mix ct): improve failure logging 2024-07-15 09:26:04 -03:00
Thales Macedo Garitezi 0555a8ec61 fix(mix): bizarre compilation order bug with `emqx` profile
For some bizarre reason, if the `:apps` key is defined in the `project()` callback in the
root umbrella `mix.exs`, it messes up the compilation order that mix follows when
compiling the project from scratch.

Specifically, in the `emqx` profile, even though `:emqx_utils` is an explicit dependency
of `:emqx_ds_builtin_local`, mix insisted in compiling the latter before the former, and
failing, obviously.  Removing the explicit `:apps` from the project definition solved
this.

🫠
2024-07-15 09:26:04 -03:00
Thales Macedo Garitezi 02a0ccfdd1 ci: preparations for new mix build 2024-07-15 09:26:04 -03:00
Thales Macedo Garitezi 9a003ee3cf feat(mix eunit): add support for filtering test cases 2024-07-15 09:26:04 -03:00
Thales Macedo Garitezi bbd51bdf18 feat(mix ct): add support for specifying group paths 2024-07-15 09:26:04 -03:00
Thales Macedo Garitezi 39c82fbe89 feat(mix): always run merge-config before release 2024-07-15 09:26:04 -03:00
Thales Macedo Garitezi 70786d6aca test: fix suite apps 2024-07-15 09:26:04 -03:00
Thales Macedo Garitezi 066fd0481b feat(mix): compile asn1 files 2024-07-15 09:26:04 -03:00
Thales Macedo Garitezi 9e4a84cf76
Merge pull request #13442 from thalesmg/20240709-r57-decouple-connector-action-hc
fix(resource manager): disentangle connector and channel health check frequencies
2024-07-15 09:12:07 -03:00
firest 269f6b29cc chore: update changes 2024-07-15 11:26:55 +08:00
firest ec183f1d4c test(coap): fix ci errors 2024-07-15 10:52:37 +08:00
ieQu1 46c2c75b7b
fix(connection): Make process_msg function tail-recursive 2024-07-14 06:00:00 +02:00
Thales Macedo Garitezi 0e57b39cf2 feat(gcp pubsub producer): retry on 502 and 503 http status code responses
Fixes https://emqx.atlassian.net/browse/EMQX-12625
2024-07-12 15:29:59 -03:00
Andrew Mayorov 2401a2fb80
test(dsraft): run `t_join_leave_errors` case in tracing context 2024-07-12 18:28:24 +02:00
Thales Macedo Garitezi 96c9020727 chore: improve protobuf decoding error messages
Fixes https://emqx.atlassian.net/browse/EMQX-12677
2024-07-12 13:27:30 -03:00
Andrew Mayorov af81800aec
chore(dsraft): log a bit more informative messages in shard allocator 2024-07-12 18:24:58 +02:00
Andrew Mayorov 8e8b382ec0
chore(dsraft): provide more details when replica is unready 2024-07-12 18:23:23 +02:00
Andrew Mayorov 70a760850f
chore(dsraft): correct comment spelling errors 2024-07-12 15:27:29 +02:00
Andrew Mayorov 205ad507ea
test(dsraft): attempt to ensure testcases start from stable state
Where "stable state" is currently defined as "everyone knows and agrees
on the current leader".
2024-07-12 15:26:00 +02:00
Ivan Dyachkov ffa69df6f8
Merge pull request #13461 from id/20240712-ci-add-sync-release-branch-workflow
ci: add sync-release-branch workflow
2024-07-12 13:13:00 +02:00
Ivan Dyachkov e07d96e4d8 ci: add sync-release-branch workflow 2024-07-12 12:52:16 +02:00
Ilia Averianov 82e723bd18
Merge pull request #13459 from savonarola/0712-reduce-flackyness
chore(mgmt): reduce test flakyness
2024-07-12 13:14:14 +03:00
Ilya Averyanov 9ca8aeb155 chore(mgmt): reduce test flakyness 2024-07-12 12:10:49 +03:00
firest 854754eb60 feat(coap): use content-sensitive udp proxy for coap 2024-07-12 16:23:46 +08:00
lafirest 4e3095b1c4
Merge pull request #13458 from lafirest/fix/banned_bf
fix(banned): let the bootfile of banned be optional
2024-07-12 15:00:43 +08:00
firest 83cc3ffeb0 fix(banned): let the bootfile of banned be optional 2024-07-12 13:58:14 +08:00
lafirest 1b7d23cef4
Merge pull request #13451 from lafirest/fix/def_banned_file
fix: do not convert a empty file name to a empty list
2024-07-12 13:50:32 +08:00
Thales Macedo Garitezi 2816170e9d chore: add `$events.message_transformation_failed` to rule engine tester
Fixes https://emqx.atlassian.net/browse/EMQX-12679
2024-07-11 17:52:08 -03:00
Thales Macedo Garitezi 5f595966d8 chore(message transformation): allow empty operation list
Fixes https://emqx.atlassian.net/browse/EMQX-12682
2024-07-11 17:52:08 -03:00
Thales Macedo Garitezi 5be654e31e
Merge pull request #13456 from thalesmg/20240711-r57-minor-schema-registry-api-nit
fix(schema registry): handle large names during lookup
2024-07-11 17:50:58 -03:00
Thales Macedo Garitezi 04b547d6f5 fix(schema registry): handle large names during lookup
Fixes https://emqx.atlassian.net/browse/EMQX-12692
2024-07-11 14:35:31 -03:00
Thales Macedo Garitezi 21313c766d ci: add dialyzer mix task 2024-07-11 14:19:23 -03:00
Thales Macedo Garitezi f3c6d10f76 fix(mix): fix compile paths and deps 2024-07-11 14:19:23 -03:00
Thales Macedo Garitezi 01d89be743 feat(message transformation): add timestamp and pub_props fields to read context
Fixes https://emqx.atlassian.net/browse/EMQX-12684

Fixes https://emqx.atlassian.net/browse/EMQX-12678
2024-07-11 12:15:31 -03:00
Thales Macedo Garitezi 44e4b3616d feat(variform): allow hyphens in identifiers
Fixes https://emqx.atlassian.net/browse/EMQX-12683
2024-07-11 12:15:23 -03:00
Kjell Winblad a4cc3ba9e8
Merge pull request #13375 from kjellwinblad/kjell/fix_connector_lister_speed_limit_clearing/EMQX-12514
fix: default value for max_conn_rate etc should be set to infinity
2024-07-11 16:36:01 +02:00
Ivan Dyachkov bf2abba17a
Merge pull request #13448 from id/20240711-fix-dashboard-tests-again
ci: fix dashboard tests (again)
2024-07-11 12:36:25 +02:00
firest d9b5c5863b fix: do not convert a empty file name to a empty list 2024-07-11 18:12:38 +08:00
lafirest c9e12f30cd
Merge pull request #13444 from lafirest/fix/oidc
fix(oidc): Avoid crashes and avoid deleting jwks on update
2024-07-11 17:55:50 +08:00
Ivan Dyachkov 3004e32473 ci: fix dashboard tests (again) 2024-07-11 11:29:29 +02:00
zmstone 7664b06e98
Merge pull request #13434 from zmstone/0704-refine-rpc-config
0704 refine rpc config
2024-07-11 10:25:45 +02:00
ieQu1 02ce7e1b07
Merge pull request #13446 from ieQu1/dev/ds-build-platform
chore(ds): Support platform profile
2024-07-10 13:26:01 +02:00
ieQu1 4825079964
chore(ds): Support platform profile 2024-07-10 12:03:23 +02:00
firest b0e3e405cf fix(oidc): Avoid crashes and avoid deleting jwks on update 2024-07-10 15:22:43 +08:00
JimMoen 44d533fe6d
Merge pull request #13432 from JimMoen/0705-fix-jwt-pem-check
fix: create authn jwt with bad public key
2024-07-10 10:33:54 +08:00
zmstone 917df38a07 docs: add changelog for PR 13434 2024-07-09 22:26:30 +02:00
zmstone 7a23ae7b4d refactor: expose only server_port for rpc
previously, there were 4 port configs:
- tcp_server_port
- ssl_server_port
- tcp_client_port
- ssl_client_port
2024-07-09 22:26:29 +02:00
zmstone ee13773496 refactor: rename rpc.tcp_client_num to rpc.client_num
tcp_client_num is kept as an alias
2024-07-09 22:26:29 +02:00
Thales Macedo Garitezi 48e604bda8 fix(mix grpc): include default erlc options 2024-07-09 15:56:30 -03:00
Thales Macedo Garitezi 818070ad44 test(mix): add integration test path 2024-07-09 15:56:30 -03:00
Thales Macedo Garitezi 5279ad76be fix(grpc compiler): unload apps to avoid side effects 2024-07-09 15:56:30 -03:00
Thales Macedo Garitezi b91515b131 fix(schema registry mix): gpb is a runtime dep 2024-07-09 15:56:30 -03:00
Thales Macedo Garitezi 6d94809950
Merge pull request #13415 from thalesmg/20240703-m-couchbase-action
feat: implement couchbase connector and action
2024-07-09 15:53:11 -03:00
Thales Macedo Garitezi 50e6ee4c88
Merge pull request #13435 from thalesmg/20240708-r57-mt-breaking-changelog
docs: add breaking change entry
2024-07-09 15:53:02 -03:00
Thales Macedo Garitezi 3c370a90aa
Merge pull request #13436 from thalesmg/20240708-r57-custom-headers-jwks
feat(jwks): allow specifying custom request headers
2024-07-09 15:52:44 -03:00
ieQu1 92dc059908
Merge pull request #13370 from ieQu1/dev/skip-streams
New durable storage layout with explicit index for LTS wildcards
2024-07-09 20:27:21 +02:00
Thales Macedo Garitezi 9f8a1885a7 fix(resource manager): disentangle connector and channel health check frequencies
Fixes https://emqx.atlassian.net/browse/EMQX-12674
2024-07-09 14:53:39 -03:00
Thales Macedo Garitezi d25c4ba06f
Merge pull request #13421 from thalesmg/20240705-r57-docs-mt-api-examples
docs(message transformation): add api examples
2024-07-09 10:16:53 -03:00
ieQu1 3721be65ee
fix(ds): Improve comments 2024-07-09 13:15:15 +02:00
ieQu1 d7732a6aac
test(ds): Attempt to stabilize a flaky test 2024-07-09 13:15:15 +02:00
ieQu1 e70c1cfea3
test(ds): Improve stability of replication test suite 2024-07-09 13:15:15 +02:00
ieQu1 dc4ae82798
test(ds): Add message ID 2024-07-09 13:15:14 +02:00
ieQu1 d1b574a67e
perf(dslocal): Run heavy-duty operations in a temporary process 2024-07-09 13:15:14 +02:00
ieQu1 661f79544b
fix(ds): Optimize hot loop of skipstream storage layout 2024-07-09 13:15:14 +02:00
ieQu1 23dafbb03b
feat(ds): Add a benchmarking tool for storage efficiency analysis 2024-07-09 13:15:14 +02:00
ieQu1 afeb2ab8aa
feat(ds): Add metrics for skipstream layout 2024-07-09 13:15:14 +02:00
ieQu1 b68ebb9a73
test(dsrepl): Generalize tests to use different storage layouts 2024-07-09 13:15:14 +02:00
ieQu1 8c5e4a2376
test(ds): Generalize storage layout test suite for different layouts 2024-07-09 13:15:14 +02:00
ieQu1 086e7256f5
feat(ds): Add configuration schema for skipstream LTS layout 2024-07-09 13:15:14 +02:00
ieQu1 a4642d4d06
feat(ds): Add a new storage layout engine: "skipstream"
This layout is based on LTS as well, but it uses separate index
streams for constrained replay of streams with learned wildcards
2024-07-09 13:15:14 +02:00
ieQu1 de48077ac4
test(ds): Add new helper functions
- Improve message comparison
- Add set operations
2024-07-09 13:15:14 +02:00
ieQu1 210556e545
feat(ds): Generalize value serialization
- Add a new utility module for message serialization
- Add experimental serializer based on ASN.1
2024-07-09 13:15:14 +02:00
ieQu1 843973ef32
fix(ds): bitfield_lts: static_key_size -> static_key_bits 2024-07-09 13:15:14 +02:00
ieQu1 f84fb34692
feat(ds_lts): New APIs: info, reverse lookups and topic compression 2024-07-09 13:15:14 +02:00
ieQu1 eb80402ccb
fix(ds): Improve typespecs and descriptions in storage_layer 2024-07-09 13:15:14 +02:00
ieQu1 71dad0242e
docs(ds): Move Raft-related parts to emqx_ds_builtin_raft README 2024-07-09 13:15:14 +02:00
ieQu1 afe1c5617d
refactor(ds): Rename macros for bitfield_lts metrics 2024-07-09 13:15:14 +02:00
ieQu1 0f2c19b656
refactor(ds): Move end_of_stream detection logic for delete_next 2024-07-09 13:15:14 +02:00
ieQu1 b565976794
fix(ds): Fix hashing of empty wildcard topic levels in bitfield_lts 2024-07-09 13:15:13 +02:00
zmstone 91fd01ed21
Merge pull request #13411 from Altair-Bueno/master
new(helm): websocket ingress (fixes #13309)
2024-07-09 11:39:07 +02:00
Thales Macedo Garitezi 0d1eaba82e
Merge pull request #13437 from thalesmg/20240708-m-fix-includes
fix(ds builtin local): use `-include_lib` instead of relative path
2024-07-08 18:40:11 -03:00
Thales Macedo Garitezi f00bb383d4 fix(ds builtin local): use `-include_lib` instead of relative path 2024-07-08 16:57:55 -03:00
Thales Macedo Garitezi 811184ddad feat(jwks): allow specifying custom request headers
Fixes https://emqx.atlassian.net/browse/EMQX-12655
2024-07-08 15:40:52 -03:00
Thales Macedo Garitezi 893630aee3 docs: add breaking change entry
Fixes https://github.com/emqx/emqx/pull/13420#issuecomment-2213957235
2024-07-08 10:18:12 -03:00
Thales Macedo Garitezi d34fc7a03a
Merge pull request #13420 from thalesmg/20240705-r57-fix-mt-empty-topics
fix(schema validation & message transformation): forbid empty topic filter list
2024-07-08 10:15:08 -03:00
Thales Macedo Garitezi 166f5e5f12
Merge pull request #13426 from thalesmg/20240705-r57-test-flaky-plugin-start-enabled
test(plugins): fix flaky test
2024-07-08 09:19:53 -03:00
zhongwencool fd18e5feb3
Merge pull request #13202 from zhongwencool/cluster-fix-cli
feat: add cluster fix command
2024-07-08 19:08:34 +08:00
zhongwencool 820789a09f fix: redact status when mark_fix_log begin 2024-07-08 17:32:45 +08:00
zhongwencool 457ea93570 test: add cluster_sync cli test 2024-07-08 17:04:41 +08:00
zhongwencool f490a0cba2 feat: don't reset tnx_id when cluster_fix 2024-07-08 17:04:41 +08:00
zhongwencool 298211d101 chore: apply suggestions from code review
Co-authored-by: zmstone <zmstone@gmail.com>
2024-07-08 17:04:41 +08:00
zhongwencool bdf3fc63a6 chore: add config leader to suggestion 2024-07-08 17:04:41 +08:00
zhongwencool 22fc3c49cc chore: combine some common code into one function 2024-07-08 17:04:41 +08:00
zhongwencool 5b105fcdbb chore: move emqx_conf_proto_v3 to emqx_conf_proto_v4 2024-07-08 17:04:41 +08:00
zhongwencool 3ed4340145 test: fix cluster_rpc test failed 2024-07-08 17:04:41 +08:00
zhongwencool 2069910ad1 feat: add cluster fix command 2024-07-08 17:04:41 +08:00
JimMoen ae3b8fe146
test: create jwt authenticator with bad public key 2024-07-08 16:52:18 +08:00
JimMoen f76444fbf8
fix: create authn jwt with bad public key 2024-07-08 16:52:18 +08:00
Shawn 5fca0a16f9 feat: rename emqx_relup to emqx_post_upgrade 2024-07-08 10:33:09 +08:00
Shawn 92594d042b feat: add some relup examples 2024-07-08 10:33:09 +08:00
Shawn e9163f2752 feat: generate relup tarball
To generate a tarball, tag the release and then:

```
make emqx-enterprise-relup
```
2024-07-08 10:33:09 +08:00
zhongwencool 29d7a511f1
Merge pull request #13419 from zhongwencool/port-pr
Port: some minor bug fixes from master
2024-07-06 14:42:51 +08:00
Thales Macedo Garitezi f9b6ae0c1a
Merge pull request #13422 from thalesmg/20240705-r57-max-heap-size-0
fix: handle `max_heap_size` = 0
2024-07-05 15:58:06 -03:00
Thales Macedo Garitezi f1b4467fe1 test(plugins): fix flaky test
Hypothesis is that both peer nodes were using the same directory and stepping on each
other's toes.
2024-07-05 14:17:42 -03:00
Thales Macedo Garitezi 70fab51354 fix: handle `max_heap_size` = 0
Fixes https://github.com/emqx/emqx/issues/13417

Fixes https://emqx.atlassian.net/browse/EMQX-12659
2024-07-05 13:10:37 -03:00
Thales Macedo Garitezi 36ee7bed77 docs(message transformation): add api examples
Fixes https://emqx.atlassian.net/browse/EMQX-12645
2024-07-05 09:59:27 -03:00
Thales Macedo Garitezi e7351d949d fix(schema validation): forbid empty topic filter list 2024-07-05 09:51:43 -03:00
Thales Macedo Garitezi e99fee68c0 fix(message transformation): forbid empty topic filter list
Fixes https://emqx.atlassian.net/browse/EMQX-12646
2024-07-05 09:49:03 -03:00
zhongwencool 7d851872ec chore: update emqx_module's app version 2024-07-05 19:21:28 +08:00
zhongwencool 9ffe6420c2 chore: add changelog for 13419 2024-07-05 17:41:02 +08:00
zhongwencool d94fcb9cfd test: fix api_config SUITE failed 2024-07-05 17:34:39 +08:00
zhongwencool ba3097dc56 fix: observer command crash when can't find object code 2024-07-05 17:34:33 +08:00
zhongwencool f0a1d785ca fix: don't allow set active_n to negative int 2024-07-05 17:34:25 +08:00
zhongwencool 8aab919f74 fix: load bad configs return unknown msg 2024-07-05 17:34:17 +08:00
zhongwencool b4cffc581b fix: ws/wss's max_frame_size should > 0 2024-07-05 17:34:09 +08:00
JimMoen c7f4e85760
Merge pull request #13418 from JimMoen/fix-docker-build-warning
build: avoid warnings during docker build
2024-07-05 17:03:14 +08:00
Altair-Bueno f2f8c2ae92 fix(helm): Default to /mqtt on ingress as Rory-Z suggestion 2024-07-05 09:09:30 +02:00
JimMoen 3e69a52596
build: avoid warnings during docker build
- See also: https://docs.docker.com/reference/build-checks/from-as-casing/
2024-07-05 14:06:58 +08:00
zhongwencool 094259f444
Merge pull request #13408 from zhongwencool/password-crash
chore: improve auth error for invalid salt/password type
2024-07-05 11:44:21 +08:00
zhongwencool 755d6c9e0f chore: add changelog for 13398 and 13408 2024-07-05 10:21:04 +08:00
JimMoen 9d0b5a9bc6
Merge pull request #13412 from JimMoen/fix-cert-notafter-badmatch
fix: obtain cert expiry epoch failed due to formated `generalTime`
2024-07-05 10:13:22 +08:00
zhongwencool d3d3303dcb chore: improve auth error for invalid salt/password type 2024-07-05 10:12:24 +08:00
lafirest aa84ca5a88
Merge pull request #13386 from lafirest/feat/banned_boot_57
feat(banned): add a bootstrap file for banned
2024-07-05 08:47:40 +08:00
Thales Macedo Garitezi c4dd167cb9 feat: implement couchbase connector and action
Fixes https://emqx.atlassian.net/browse/EMQX-12545
2024-07-04 17:51:59 -03:00
Thales Macedo Garitezi b333babb4c
Merge pull request #13401 from thalesmg/20240703-r57-authz-ignore-api-metrics
fix(authz api): add new `ignore` metric to status response
2024-07-04 17:07:08 -03:00
JimMoen d84d31cbc5
test: cert expiry epoch with `generalTime` formatted 2024-07-05 01:47:13 +08:00
Kjell Winblad d206d24975 fix: only set default for max_conn_rate and update test case
This revert the change in commit e291dcd for all listener "short
path fields" except the field max_conn_rate so they no longer have a
default value. It also updates a test case that assume that no listener
config is created by default but this is no longer the case when there
is a default value for the max_conn_rate field.
2024-07-04 14:32:10 +02:00
Thales Macedo Garitezi f758fd9279
Merge pull request #13405 from thalesmg/20240703-r57-test-flaky-table-removed-postgres
test(postgres bridge): attempt to stabilize flaky test
2024-07-04 09:25:15 -03:00
JimMoen 8c6cd69caa
fix: obtain cert expiry epoch failed due to formated `generalTime` 2024-07-04 18:23:56 +08:00
Altair-Bueno 20be0df62d feat(helm): websocket ingress
Closes: #13309
2024-07-04 10:52:42 +02:00
zhongwencool ac77b8a131
Merge pull request #13403 from zmstone/0703-upgrade-hocon-0.43.1
0703 upgrade hocon 0.43.1
2024-07-04 09:30:48 +08:00
firest a912751458 chore: update changes 2024-07-04 09:13:50 +08:00
firest 913e0ce18b feat(banned): add a bootstrap file for banned 2024-07-04 09:12:12 +08:00
zmstone 947cddb2eb test: invalid map key is caught by hocon
now that hocon has a built-in map key validation,
some of the resource name validations are cought by hocon
2024-07-03 23:00:18 +02:00
zmstone 5446bc305f docs: add changelog for PR 13403 2024-07-03 23:00:18 +02:00
zmstone eaaee725c2 fix: upgrade to hocon 0.43.1
included 3 changes since 0.42.2

- allow validation of map keys
- improve crash stacktrace report
- avoid dumping array environment variable values
2024-07-03 23:00:14 +02:00
zmstone 7ee5b90084
Merge pull request #13400 from zmstone/0605-ACL-rules-in-http-authentication-response
feat(auth): support HTTP authn return ACL rules
2024-07-03 21:51:07 +02:00
Thales Macedo Garitezi 72579f9014 test(postgres bridge): attempt to stabilize flaky test
```
%%% emqx_bridge_pgsql_SUITE ==> tcp.sync.with_batch.t_table_removed: FAILED
%%% emqx_bridge_pgsql_SUITE ==> {{panic,
     #{msg => "Unexpected result",
       result =>
           {run_stage_failed,exit,
               {test_case_failed,
                   "unexpected result: {error,{recoverable_error,sync_required}}"},
               [{emqx_bridge_pgsql_SUITE,'-t_table_removed/1-fun-3-',3,
                    [{file,
                         "/emqx/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl"},
                     {line,822}]},
```

```
Error: -03T17:52:54.046809+00:00 [error] Generic server <0.352770.0> terminating. Reason: {'module could not be loaded',[{undefined,handle_message,[90,<<"I">>,{state,ssl,{sslsocket,{gen_tcp,#Port<0.1671>,tls_connection,undefined},[<0.352774.0>,<0.352773.0>]},<<>>,{336,-2093820527},on_message,{codec,#{},[null,undefined],{oid_db,#{16 =>
...
2024-07-03T17:52:54.075446+00:00 [critical] Run stage failed: exit:{test_case_failed,"unexpected result: {error,\n                    {resource_error,\n                     #{reason => exception,\n                       msg =>\n                        #{error =>\n                           {exit,\n                            {{undef,\n                              [{undefined,handle_message,\n                                [90,<<\"I\">>,\n                                 {state,ssl,\n                                  {sslsocket,\n                                   {gen_tcp,#Port<0.1671>,tls_connection,\n
```
2024-07-03 15:05:00 -03:00
zmstone 9194756963 feat(auth): support HTTP authn return ACL rules 2024-07-03 15:37:11 +02:00
Thales Macedo Garitezi c3579f338b fix(authz api): add new `ignore` metric to status response
Fixes https://emqx.atlassian.net/browse/EMQX-12411
2024-07-03 10:16:18 -03:00
Kjell Winblad 82bb03a2a3 docs: add change log entry 2024-07-01 13:30:51 +02:00
Kjell Winblad e291dcdd18 fix: default value for max_conn_rate etc should be set to infinity
Before this commit the default value for the fields max_conn_rate,
messages_rate and bytes_rate were not set. This is fixed by setting the
default value to infinity. This breaks the corresponding dashboard
fields (they can not be edited) so the dashboard also needs to be
updated.

Fixes:
https://emqx.atlassian.net/browse/EMQX-12514
2024-07-01 13:16:26 +02:00
629 changed files with 20977 additions and 3899 deletions

View File

@ -0,0 +1,30 @@
version: '3.9'
services:
couchbase:
container_name: couchbase
hostname: couchbase
image: ghcr.io/emqx/couchbase:1.0.0
restart: always
expose:
- 8091-8093
# ports:
# - "8091-8093:8091-8093"
networks:
- emqx_bridge
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8093/admin/ping"]
interval: 30s
timeout: 5s
retries: 4
environment:
- CLUSTER=localhost
- USER=admin
- PASS=public
- PORT=8091
- RAMSIZEMB=2048
- RAMSIZEINDEXMB=512
- RAMSIZEFTSMB=512
- BUCKETS=mqtt
- BUCKETSIZES=100
- AUTOREBALANCE=true

View File

@ -10,7 +10,7 @@ services:
nofile: 1024
image: openldap
#ports:
# - 389:389
# - "389:389"
volumes:
- ./certs/ca.crt:/etc/certs/ca.crt
restart: always

View File

@ -0,0 +1,61 @@
# LDAP authentication
To run manual tests with the default docker-compose files.
Expose openldap container port by uncommenting the `ports` config in `docker-compose-ldap.yaml `
To start openldap:
```
docker-compose -f ./.ci/docker-compose-file/docker-compose.yaml -f ./.ci/docker-compose-file/docker-compose-ldap.yaml up -docker
```
## LDAP database
LDAP database is populated from below files:
```
apps/emqx_ldap/test/data/emqx.io.ldif /usr/local/etc/openldap/schema/emqx.io.ldif
apps/emqx_ldap/test/data/emqx.schema /usr/local/etc/openldap/schema/emqx.schema
```
## Minimal EMQX config
```
authentication = [
{
backend = ldap
base_dn = "uid=${username},ou=testdevice,dc=emqx,dc=io"
filter = "(& (objectClass=mqttUser) (uid=${username}))"
mechanism = password_based
method {
is_superuser_attribute = isSuperuser
password_attribute = userPassword
type = hash
}
password = public
pool_size = 8
query_timeout = "5s"
request_timeout = "10s"
server = "localhost:1389"
username = "cn=root,dc=emqx,dc=io"
}
]
```
## Example ldapsearch command
```
ldapsearch -x -H ldap://localhost:389 -D "cn=root,dc=emqx,dc=io" -W -b "uid=mqttuser0007,ou=testdevice,dc=emqx,dc=io" "(&(objectClass=mqttUser)(uid=mqttuser0007))"
```
## Example mqttx command
The client password hashes are generated from their username.
```
# disabled user
mqttx pub -t 't/1' -h localhost -p 1883 -m x -u mqttuser0006 -P mqttuser0006
# enabled super-user
mqttx pub -t 't/1' -h localhost -p 1883 -m x -u mqttuser0007 -P mqttuser0007
```

View File

@ -221,5 +221,11 @@
"listen": "0.0.0.0:10000",
"upstream": "azurite:10000",
"enabled": true
},
{
"name": "couchbase",
"listen": "0.0.0.0:8093",
"upstream": "couchbase:8093",
"enabled": true
}
]

View File

@ -51,7 +51,7 @@ runs:
echo "SELF_HOSTED=false" >> $GITHUB_OUTPUT
;;
esac
- uses: actions/cache@ab5e6d0c87105b4c9c2047343972218f562e4319 # v4.0.1
- uses: actions/cache@0c45773b623bea8c8e75f6c82b208c3cf94ea4f9 # v4.0.2
id: cache
if: steps.prepare.outputs.SELF_HOSTED != 'true'
with:

1
.github/workflows/.zipignore2 vendored Normal file
View File

@ -0,0 +1 @@
*/.github/*

View File

@ -152,7 +152,7 @@ jobs:
echo "PROFILE=${PROFILE}" | tee -a .env
echo "PKG_VSN=$(./pkg-vsn.sh ${PROFILE})" | tee -a .env
zip -ryq -x@.github/workflows/.zipignore $PROFILE.zip .
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
with:
name: ${{ matrix.profile }}
path: ${{ matrix.profile }}.zip

View File

@ -163,7 +163,7 @@ jobs:
echo "PROFILE=${PROFILE}" | tee -a .env
echo "PKG_VSN=$(./pkg-vsn.sh ${PROFILE})" | tee -a .env
zip -ryq -x@.github/workflows/.zipignore $PROFILE.zip .
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
with:
name: ${{ matrix.profile }}
path: ${{ matrix.profile }}.zip

View File

@ -83,7 +83,7 @@ jobs:
id: build
run: |
make ${{ matrix.profile }}-tgz
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
with:
name: "${{ matrix.profile }}-${{ matrix.arch }}.tar.gz"
path: "_packages/emqx*/emqx-*.tar.gz"
@ -110,7 +110,7 @@ jobs:
- uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
with:
ref: ${{ github.event.inputs.ref }}
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
pattern: "${{ matrix.profile[0] }}-*.tar.gz"
path: _packages
@ -122,24 +122,25 @@ jobs:
run: |
ls -lR _packages/$PROFILE
mv _packages/$PROFILE/*.tar.gz ./
- name: Enable containerd image store on Docker Engine
run: |
echo "$(jq '. += {"features": {"containerd-snapshotter": true}}' /etc/docker/daemon.json)" > daemon.json
echo "$(sudo cat /etc/docker/daemon.json | jq '. += {"features": {"containerd-snapshotter": true}}')" > daemon.json
sudo mv daemon.json /etc/docker/daemon.json
sudo systemctl restart docker
- uses: docker/setup-qemu-action@68827325e0b33c7199eb31dd4e31fbe9023e06e3 # v3.0.0
- uses: docker/setup-buildx-action@d70bba72b1f3fd22344832f00baa16ece964efeb # v3.3.0
- uses: docker/setup-qemu-action@49b3bc8e6bdd4a60e6116a5414239cba5943d3cf # v3.2.0
- uses: docker/setup-buildx-action@988b5a0280414f521da01fcc63a27aeeb4b104db # v3.6.1
- name: Login to hub.docker.com
uses: docker/login-action@0d4c9c5ea7693da7b068278f7b52bda2a190a446 # v3.2.0
uses: docker/login-action@9780b0c442fbb1117ed29e0efdff1e18412f7567 # v3.3.0
if: inputs.publish && contains(matrix.profile[1], 'docker.io')
with:
username: ${{ secrets.DOCKER_HUB_USER }}
password: ${{ secrets.DOCKER_HUB_TOKEN }}
- name: Login to AWS ECR
uses: docker/login-action@0d4c9c5ea7693da7b068278f7b52bda2a190a446 # v3.2.0
uses: docker/login-action@9780b0c442fbb1117ed29e0efdff1e18412f7567 # v3.3.0
if: inputs.publish && contains(matrix.profile[1], 'public.ecr.aws')
with:
registry: public.ecr.aws

View File

@ -51,7 +51,7 @@ jobs:
if: always()
run: |
docker save $_EMQX_DOCKER_IMAGE_TAG | gzip > $EMQX_NAME-docker-$PKG_VSN.tar.gz
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
with:
name: "${{ env.EMQX_NAME }}-docker"
path: "${{ env.EMQX_NAME }}-docker-${{ env.PKG_VSN }}.tar.gz"

View File

@ -95,7 +95,7 @@ jobs:
apple_developer_identity: ${{ secrets.APPLE_DEVELOPER_IDENTITY }}
apple_developer_id_bundle: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE }}
apple_developer_id_bundle_password: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE_PASSWORD }}
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: success()
with:
name: ${{ matrix.profile }}-${{ matrix.os }}-${{ matrix.otp }}
@ -180,7 +180,7 @@ jobs:
--builder $BUILDER \
--elixir $IS_ELIXIR \
--pkgtype pkg
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
with:
name: ${{ matrix.profile }}-${{ matrix.os }}-${{ matrix.arch }}${{ matrix.with_elixir == 'yes' && '-elixir' || '' }}-${{ matrix.builder }}-${{ matrix.otp }}-${{ matrix.elixir }}
path: _packages/${{ matrix.profile }}/
@ -198,7 +198,7 @@ jobs:
profile:
- ${{ inputs.profile }}
steps:
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
pattern: "${{ matrix.profile }}-*"
path: packages/${{ matrix.profile }}

View File

@ -23,6 +23,7 @@ jobs:
profile:
- ['emqx', 'master']
- ['emqx', 'release-57']
- ['emqx', 'release-58']
os:
- ubuntu22.04
- amzn2023
@ -53,7 +54,7 @@ jobs:
- name: build pkg
run: |
./scripts/buildx.sh --profile "$PROFILE" --pkgtype pkg --builder "$BUILDER"
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: success()
with:
name: ${{ matrix.profile[0] }}-${{ matrix.profile[1] }}-${{ matrix.os }}
@ -101,7 +102,7 @@ jobs:
apple_developer_identity: ${{ secrets.APPLE_DEVELOPER_IDENTITY }}
apple_developer_id_bundle: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE }}
apple_developer_id_bundle_password: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE_PASSWORD }}
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: success()
with:
name: ${{ matrix.profile }}-${{ matrix.os }}

View File

@ -41,13 +41,13 @@ jobs:
- name: build pkg
run: |
./scripts/buildx.sh --profile $PROFILE --pkgtype pkg --elixir $ELIXIR --arch $ARCH
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
with:
name: "${{ matrix.profile[0] }}-${{ matrix.profile[1] }}-${{ matrix.profile[2] }}"
path: _packages/${{ matrix.profile[0] }}/*
retention-days: 7
compression-level: 0
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
with:
name: "${{ matrix.profile[0] }}-schema-dump-${{ matrix.profile[1] }}-${{ matrix.profile[2] }}"
path: |
@ -84,7 +84,7 @@ jobs:
apple_developer_identity: ${{ secrets.APPLE_DEVELOPER_IDENTITY }}
apple_developer_id_bundle: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE }}
apple_developer_id_bundle_password: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE_PASSWORD }}
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
with:
name: ${{ matrix.os }}
path: _packages/**/*

View File

@ -37,7 +37,7 @@ jobs:
- run: ./scripts/check-elixir-deps-discrepancies.exs
- run: ./scripts/check-elixir-applications.exs
- name: Upload produced lock files
uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: failure()
with:
name: ${{ matrix.profile }}_produced_lock_files

View File

@ -24,6 +24,7 @@ jobs:
branch:
- master
- release-57
- release-58
language:
- cpp
- python

View File

@ -24,6 +24,7 @@ jobs:
ref:
- master
- release-57
- release-58
steps:
- uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
with:

View File

@ -52,7 +52,7 @@ jobs:
id: package_file
run: |
echo "PACKAGE_FILE=$(find _packages/emqx -name 'emqx-*.deb' | head -n 1 | xargs basename)" >> $GITHUB_OUTPUT
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
with:
name: emqx-ubuntu20.04
path: _packages/emqx/${{ steps.package_file.outputs.PACKAGE_FILE }}
@ -77,7 +77,7 @@ jobs:
repository: emqx/tf-emqx-performance-test
path: tf-emqx-performance-test
ref: v0.2.3
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
name: emqx-ubuntu20.04
path: tf-emqx-performance-test/
@ -113,13 +113,13 @@ jobs:
working-directory: ./tf-emqx-performance-test
run: |
terraform destroy -auto-approve
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: success()
with:
name: metrics
path: |
"./tf-emqx-performance-test/*.tar.gz"
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: failure()
with:
name: terraform
@ -148,7 +148,7 @@ jobs:
repository: emqx/tf-emqx-performance-test
path: tf-emqx-performance-test
ref: v0.2.3
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
name: emqx-ubuntu20.04
path: tf-emqx-performance-test/
@ -184,13 +184,13 @@ jobs:
working-directory: ./tf-emqx-performance-test
run: |
terraform destroy -auto-approve
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: success()
with:
name: metrics
path: |
"./tf-emqx-performance-test/*.tar.gz"
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: failure()
with:
name: terraform
@ -220,7 +220,7 @@ jobs:
repository: emqx/tf-emqx-performance-test
path: tf-emqx-performance-test
ref: v0.2.3
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
name: emqx-ubuntu20.04
path: tf-emqx-performance-test/
@ -257,13 +257,13 @@ jobs:
working-directory: ./tf-emqx-performance-test
run: |
terraform destroy -auto-approve
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: success()
with:
name: metrics
path: |
"./tf-emqx-performance-test/*.tar.gz"
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: failure()
with:
name: terraform
@ -294,7 +294,7 @@ jobs:
repository: emqx/tf-emqx-performance-test
path: tf-emqx-performance-test
ref: v0.2.3
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
name: emqx-ubuntu20.04
path: tf-emqx-performance-test/
@ -330,13 +330,13 @@ jobs:
working-directory: ./tf-emqx-performance-test
run: |
terraform destroy -auto-approve
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: success()
with:
name: metrics
path: |
"./tf-emqx-performance-test/*.tar.gz"
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: failure()
with:
name: terraform

View File

@ -25,7 +25,7 @@ jobs:
- emqx
- emqx-enterprise
steps:
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
name: ${{ matrix.profile }}
- name: extract artifact
@ -40,7 +40,7 @@ jobs:
if: failure()
run: |
cat _build/${{ matrix.profile }}/rel/emqx/log/erlang.log.*
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: failure()
with:
name: conftest-logs-${{ matrix.profile }}

View File

@ -35,7 +35,7 @@ jobs:
source env.sh
PKG_VSN=$(docker run --rm -v $(pwd):$(pwd) -w $(pwd) -u $(id -u) "$EMQX_BUILDER" ./pkg-vsn.sh "$EMQX_NAME")
echo "PKG_VSN=$PKG_VSN" >> "$GITHUB_ENV"
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
name: ${{ env.EMQX_NAME }}-docker
path: /tmp
@ -69,7 +69,6 @@ jobs:
shell: bash
env:
EMQX_NAME: ${{ matrix.profile }}
_EMQX_TEST_DB_BACKEND: ${{ matrix.cluster_db_backend }}
strategy:
fail-fast: false
@ -78,18 +77,20 @@ jobs:
- emqx
- emqx-enterprise
- emqx-elixir
cluster_db_backend:
- mnesia
- rlog
steps:
- uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
- name: Set up environment
id: env
run: |
source env.sh
if [ "$EMQX_NAME" = "emqx-enterprise" ]; then
_EMQX_TEST_DB_BACKEND='rlog'
else
_EMQX_TEST_DB_BACKEND='mnesia'
fi
PKG_VSN=$(docker run --rm -v $(pwd):$(pwd) -w $(pwd) -u $(id -u) "$EMQX_BUILDER" ./pkg-vsn.sh "$EMQX_NAME")
echo "PKG_VSN=$PKG_VSN" >> "$GITHUB_ENV"
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
name: ${{ env.EMQX_NAME }}-docker
path: /tmp

View File

@ -95,7 +95,7 @@ jobs:
echo "Suites: $SUITES"
./rebar3 as standalone_test ct --name 'test@127.0.0.1' -v --readable=true --suite="$SUITES"
fi
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: failure()
with:
name: logs-emqx-app-tests-${{ matrix.type }}

View File

@ -44,7 +44,7 @@ jobs:
source env.sh
PKG_VSN=$(docker run --rm -v $(pwd):$(pwd) -w $(pwd) -u $(id -u) "$EMQX_BUILDER" ./pkg-vsn.sh "$EMQX_NAME")
echo "EMQX_TAG=$PKG_VSN" >> "$GITHUB_ENV"
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
name: "${{ env.EMQX_NAME }}-docker"
path: /tmp

View File

@ -31,7 +31,7 @@ jobs:
else
wget --no-verbose --no-check-certificate -O /tmp/apache-jmeter.tgz $ARCHIVE_URL
fi
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
with:
name: apache-jmeter.tgz
path: /tmp/apache-jmeter.tgz
@ -58,7 +58,7 @@ jobs:
source env.sh
PKG_VSN=$(docker run --rm -v $(pwd):$(pwd) -w $(pwd) -u $(id -u) "$EMQX_BUILDER" ./pkg-vsn.sh emqx)
echo "PKG_VSN=$PKG_VSN" >> "$GITHUB_ENV"
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
name: emqx-docker
path: /tmp
@ -95,7 +95,7 @@ jobs:
echo "check logs failed"
exit 1
fi
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: always()
with:
name: jmeter_logs-advanced_feat-${{ matrix.scripts_type }}
@ -127,7 +127,7 @@ jobs:
source env.sh
PKG_VSN=$(docker run --rm -v $(pwd):$(pwd) -w $(pwd) -u $(id -u) "$EMQX_BUILDER" ./pkg-vsn.sh emqx)
echo "PKG_VSN=$PKG_VSN" >> "$GITHUB_ENV"
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
name: emqx-docker
path: /tmp
@ -175,7 +175,7 @@ jobs:
if: failure()
run: |
docker compose -f .ci/docker-compose-file/docker-compose-emqx-cluster.yaml logs --no-color > ./jmeter_logs/emqx.log
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: always()
with:
name: jmeter_logs-pgsql_authn_authz-${{ matrix.scripts_type }}_${{ matrix.pgsql_tag }}
@ -204,7 +204,7 @@ jobs:
source env.sh
PKG_VSN=$(docker run --rm -v $(pwd):$(pwd) -w $(pwd) -u $(id -u) "$EMQX_BUILDER" ./pkg-vsn.sh emqx)
echo "PKG_VSN=$PKG_VSN" >> "$GITHUB_ENV"
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
name: emqx-docker
path: /tmp
@ -248,7 +248,7 @@ jobs:
echo "check logs failed"
exit 1
fi
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: always()
with:
name: jmeter_logs-mysql_authn_authz-${{ matrix.scripts_type }}_${{ matrix.mysql_tag }}
@ -273,7 +273,7 @@ jobs:
source env.sh
PKG_VSN=$(docker run --rm -v $(pwd):$(pwd) -w $(pwd) -u $(id -u) "$EMQX_BUILDER" ./pkg-vsn.sh emqx)
echo "PKG_VSN=$PKG_VSN" >> "$GITHUB_ENV"
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
name: emqx-docker
path: /tmp
@ -313,7 +313,7 @@ jobs:
echo "check logs failed"
exit 1
fi
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: always()
with:
name: jmeter_logs-JWT_authn-${{ matrix.scripts_type }}
@ -339,7 +339,7 @@ jobs:
source env.sh
PKG_VSN=$(docker run --rm -v $(pwd):$(pwd) -w $(pwd) -u $(id -u) "$EMQX_BUILDER" ./pkg-vsn.sh emqx)
echo "PKG_VSN=$PKG_VSN" >> "$GITHUB_ENV"
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
name: emqx-docker
path: /tmp
@ -370,7 +370,7 @@ jobs:
echo "check logs failed"
exit 1
fi
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: always()
with:
name: jmeter_logs-built_in_database_authn_authz-${{ matrix.scripts_type }}

View File

@ -25,7 +25,7 @@ jobs:
run:
shell: bash
steps:
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
name: emqx-enterprise
- name: extract artifact
@ -45,7 +45,7 @@ jobs:
run: |
export PROFILE='emqx-enterprise'
make emqx-enterprise-tgz
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
name: Upload built emqx and test scenario
with:
name: relup_tests_emqx_built
@ -72,7 +72,7 @@ jobs:
run:
shell: bash
steps:
- uses: erlef/setup-beam@a6e26b22319003294c58386b6f25edbc7336819a # v1.18.0
- uses: erlef/setup-beam@b9c58b0450cd832ccdb3c17cc156a47065d2114f # v1.18.1
with:
otp-version: 26.2.5
- uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
@ -88,7 +88,7 @@ jobs:
./configure
make
echo "$(pwd)/bin" >> $GITHUB_PATH
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
name: Download built emqx and test scenario
with:
name: relup_tests_emqx_built
@ -111,7 +111,7 @@ jobs:
docker logs node2.emqx.io | tee lux_logs/emqx2.log
exit 1
fi
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
name: Save debug data
if: failure()
with:

View File

@ -46,7 +46,7 @@ jobs:
contents: read
steps:
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
name: ${{ matrix.profile }}
@ -90,7 +90,7 @@ jobs:
contents: read
steps:
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
name: ${{ matrix.profile }}
- name: extract artifact
@ -133,7 +133,7 @@ jobs:
if: failure()
run: tar -czf logs.tar.gz _build/test/logs
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: failure()
with:
name: logs-${{ matrix.profile }}-${{ matrix.prefix }}-sg${{ matrix.suitegroup }}
@ -164,7 +164,7 @@ jobs:
CT_COVER_EXPORT_PREFIX: ${{ matrix.profile }}-sg${{ matrix.suitegroup }}
steps:
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
name: ${{ matrix.profile }}
- name: extract artifact
@ -193,7 +193,7 @@ jobs:
if: failure()
run: tar -czf logs.tar.gz _build/test/logs
- uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
- uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
if: failure()
with:
name: logs-${{ matrix.profile }}-${{ matrix.prefix }}-sg${{ matrix.suitegroup }}

View File

@ -30,7 +30,7 @@ jobs:
persist-credentials: false
- name: "Run analysis"
uses: ossf/scorecard-action@dc50aa9510b46c811795eb24b2f1ba02a914e534 # v2.3.3
uses: ossf/scorecard-action@62b2cac7ed8198b15735ed49ab1e5cf35480ba46 # v2.4.0
with:
results_file: results.sarif
results_format: sarif
@ -40,7 +40,7 @@ jobs:
publish_results: true
- name: "Upload artifact"
uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3
uses: actions/upload-artifact@89ef406dd8d7e03cfd12d9e0a4a378f454709029 # v4.3.5
with:
name: SARIF file
path: results.sarif

View File

@ -19,7 +19,7 @@ jobs:
- emqx-enterprise
runs-on: ${{ endsWith(github.repository, '/emqx') && 'ubuntu-22.04' || fromJSON('["self-hosted","ephemeral","linux","x64"]') }}
steps:
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
pattern: "${{ matrix.profile }}-schema-dump-*-x64"
merge-multiple: true

View File

@ -30,7 +30,7 @@ jobs:
include: ${{ fromJson(inputs.ct-matrix) }}
container: "${{ inputs.builder }}"
steps:
- uses: actions/download-artifact@65a9edc5881444af0b9093a5e628f2fe47ea3b2e # v4.1.7
- uses: actions/download-artifact@fa0a91b85d4f404e444e00e005971372dc801d16 # v4.1.8
with:
name: ${{ matrix.profile }}
- name: extract artifact

View File

@ -0,0 +1,88 @@
name: Sync release branch
concurrency:
group: sync-release-branch-${{ github.event_name }}-${{ github.ref }}
cancel-in-progress: true
on:
schedule:
- cron: '0 2 * * *'
workflow_dispatch:
permissions:
contents: read
jobs:
create-pr:
runs-on: ${{ endsWith(github.repository, '/emqx') && 'ubuntu-22.04' || fromJSON('["self-hosted","ephemeral","linux","x64"]') }}
strategy:
fail-fast: false
matrix:
branch:
- release-57
env:
SYNC_BRANCH: ${{ matrix.branch }}
defaults:
run:
shell: bash
permissions:
contents: write
pull-requests: write
steps:
- uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
with:
fetch-depth: 0
- name: create new branch
run: |
set -euxo pipefail
NEW_BRANCH_NAME=sync-${SYNC_BRANCH}-$(date +"%Y%m%d-%H%M%S")
echo "NEW_BRANCH_NAME=${NEW_BRANCH_NAME}" >> $GITHUB_ENV
git config --global user.name "${GITHUB_ACTOR}"
git config --global user.email "${GITHUB_ACTOR}@users.noreply.github.com"
git checkout -b ${NEW_BRANCH_NAME}
git merge origin/${SYNC_BRANCH} 2>&1 | tee merge.log
git push origin ${NEW_BRANCH_NAME}:${NEW_BRANCH_NAME}
- name: create pull request
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
set -euxo pipefail
for pr in $(gh pr list --state open --base master --label sync-release-branch --search "Sync ${SYNC_BRANCH} in:title" --repo ${{ github.repository }} --json number --jq '.[] | .number'); do
gh pr close $pr --repo ${{ github.repository }} --delete-branch || true
done
gh pr create --title "Sync ${SYNC_BRANCH}" --body "Sync ${SYNC_BRANCH}" --base master --head ${NEW_BRANCH_NAME} --label sync-release-branch --repo ${{ github.repository }}
- name: Send notification to Slack
if: failure()
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
run: |
awk '{printf "%s\\n", $0}' merge.log > merge.log.1
cat <<EOF > payload.json
{
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "Automatic sync of ${SYNC_BRANCH} branch failed: https://github.com/${{ github.repository }}/actions/runs/${{ github.run_id }}"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "\`\`\`$(cat merge.log.1)\`\`\`"
}
}
]
}
EOF
curl -X POST -H 'Content-type: application/json' --data @payload.json "$SLACK_WEBHOOK_URL"

View File

@ -10,8 +10,8 @@ include env.sh
# Dashboard version
# from https://github.com/emqx/emqx-dashboard5
export EMQX_DASHBOARD_VERSION ?= v1.9.1
export EMQX_EE_DASHBOARD_VERSION ?= e1.7.1
export EMQX_DASHBOARD_VERSION ?= v1.10.0-beta.1
export EMQX_EE_DASHBOARD_VERSION ?= e1.8.0-beta.1
export EMQX_RELUP ?= true
export EMQX_REL_FORM ?= tgz
@ -28,6 +28,8 @@ CT_COVER_EXPORT_PREFIX ?= $(PROFILE)
export REBAR_GIT_CLONE_OPTIONS += --depth=1
ELIXIR_COMMON_DEPS := ensure-hex ensure-mix-rebar3 ensure-mix-rebar
.PHONY: default
default: $(REBAR) $(PROFILE)
@ -58,8 +60,12 @@ ensure-mix-rebar3: $(REBAR)
ensure-mix-rebar: $(REBAR)
@mix local.rebar --if-missing --force
.PHONY: elixir-common-deps
elixir-common-deps: $(ELIXIR_COMMON_DEPS)
.PHONY: mix-deps-get
mix-deps-get: $(ELIXIR_COMMON_DEPS)
mix-deps-get: elixir-common-deps
@mix deps.get
.PHONY: eunit
@ -238,7 +244,7 @@ $(foreach zt,$(ALL_ZIPS),$(eval $(call download-relup-packages,$(zt))))
## relup target is to create relup instructions
.PHONY: $(REL_PROFILES:%=%-relup)
define gen-relup-target
$1-relup: $1-relup-downloads $(COMMON_DEPS)
$1-relup: $(COMMON_DEPS)
@$(BUILD) $1 relup
endef
ALL_TGZS = $(REL_PROFILES)
@ -247,7 +253,7 @@ $(foreach zt,$(ALL_TGZS),$(eval $(call gen-relup-target,$(zt))))
## tgz target is to create a release package .tar.gz with relup
.PHONY: $(REL_PROFILES:%=%-tgz)
define gen-tgz-target
$1-tgz: $1-relup
$1-tgz: $(COMMON_DEPS)
@$(BUILD) $1 tgz
endef
ALL_TGZS = $(REL_PROFILES)

View File

@ -65,9 +65,20 @@
%% Route
%%--------------------------------------------------------------------
-record(share_dest, {
session_id :: emqx_session:session_id(),
group :: emqx_types:group()
}).
-record(route, {
topic :: binary(),
dest :: node() | {binary(), node()} | emqx_session:session_id() | emqx_external_broker:dest()
dest ::
node()
| {binary(), node()}
| emqx_session:session_id()
%% One session can also have multiple subscriptions to the same topic through different groups
| #share_dest{}
| emqx_external_broker:dest()
}).
%%--------------------------------------------------------------------

View File

@ -683,6 +683,7 @@ end).
-define(FRAME_PARSE_ERROR, frame_parse_error).
-define(FRAME_SERIALIZE_ERROR, frame_serialize_error).
-define(THROW_FRAME_ERROR(Reason), erlang:throw({?FRAME_PARSE_ERROR, Reason})).
-define(THROW_SERIALIZE_ERROR(Reason), erlang:throw({?FRAME_SERIALIZE_ERROR, Reason})).

View File

@ -32,7 +32,7 @@
%% `apps/emqx/src/bpapi/README.md'
%% Opensource edition
-define(EMQX_RELEASE_CE, "5.7.1").
-define(EMQX_RELEASE_CE, "5.8.0-alpha.1").
%% Enterprise edition
-define(EMQX_RELEASE_EE, "5.7.1").
-define(EMQX_RELEASE_EE, "5.8.0-alpha.1").

View File

@ -41,16 +41,20 @@
).
%% NOTE: do not forget to use atom for msg and add every used msg to
%% the default value of `log.thorttling.msgs` list.
%% the default value of `log.throttling.msgs` list.
-define(SLOG_THROTTLE(Level, Data),
?SLOG_THROTTLE(Level, Data, #{})
).
-define(SLOG_THROTTLE(Level, Data, Meta),
?SLOG_THROTTLE(Level, undefined, Data, Meta)
).
-define(SLOG_THROTTLE(Level, UniqueKey, Data, Meta),
case logger:allow(Level, ?MODULE) of
true ->
(fun(#{msg := __Msg} = __Data) ->
case emqx_log_throttler:allow(__Msg) of
case emqx_log_throttler:allow(__Msg, UniqueKey) of
true ->
logger:log(Level, __Data, Meta);
false ->
@ -87,7 +91,7 @@
?_DO_TRACE(Tag, Msg, Meta),
?SLOG(
Level,
(emqx_trace_formatter:format_meta_map(Meta))#{msg => Msg, tag => Tag},
(Meta)#{msg => Msg, tag => Tag},
#{is_trace => false}
)
end).

View File

@ -8,7 +8,7 @@ defmodule EMQX.MixProject do
app: :emqx,
version: "0.1.0",
build_path: "../../_build",
erlc_paths: UMP.erlc_paths(),
erlc_paths: erlc_paths(),
erlc_options: [
{:i, "src"}
| UMP.erlc_options()
@ -36,8 +36,9 @@ defmodule EMQX.MixProject do
def deps() do
## FIXME!!! go though emqx.app.src and add missing stuff...
[
{:emqx_mix_utils, in_umbrella: true, runtime: false},
{:emqx_utils, in_umbrella: true},
# {:emqx_ds_backends, in_umbrella: true},
{:emqx_ds_backends, in_umbrella: true},
UMP.common_dep(:gproc),
UMP.common_dep(:gen_rpc),
@ -53,6 +54,15 @@ defmodule EMQX.MixProject do
] ++ UMP.quicer_dep()
end
defp erlc_paths() do
paths = UMP.erlc_paths()
if UMP.test_env?() do
["integration_test" | paths]
else
paths
end
end
defp extra_dirs() do
dirs = ["src", "etc"]
if UMP.test_env?() do

View File

@ -10,12 +10,14 @@
{emqx_bridge,5}.
{emqx_bridge,6}.
{emqx_broker,1}.
{emqx_cluster_link,1}.
{emqx_cm,1}.
{emqx_cm,2}.
{emqx_cm,3}.
{emqx_conf,1}.
{emqx_conf,2}.
{emqx_conf,3}.
{emqx_conf,4}.
{emqx_connector,1}.
{emqx_dashboard,1}.
{emqx_delayed,1}.
@ -25,6 +27,7 @@
{emqx_ds,2}.
{emqx_ds,3}.
{emqx_ds,4}.
{emqx_ds_shared_sub,1}.
{emqx_eviction_agent,1}.
{emqx_eviction_agent,2}.
{emqx_eviction_agent,3}.
@ -47,6 +50,7 @@
{emqx_mgmt_api_plugins,1}.
{emqx_mgmt_api_plugins,2}.
{emqx_mgmt_api_plugins,3}.
{emqx_mgmt_api_relup,1}.
{emqx_mgmt_cluster,1}.
{emqx_mgmt_cluster,2}.
{emqx_mgmt_cluster,3}.
@ -59,7 +63,6 @@
{emqx_node_rebalance_api,1}.
{emqx_node_rebalance_api,2}.
{emqx_node_rebalance_evacuation,1}.
{emqx_node_rebalance_purge,1}.
{emqx_node_rebalance_status,1}.
{emqx_node_rebalance_status,2}.
{emqx_persistent_session_ds,1}.

View File

@ -28,15 +28,14 @@
{lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}},
{gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.2"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.12.0"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.5"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.42.2"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.43.2"}}},
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}},
{pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
{recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}},
{snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.10"}}},
{ra, "2.7.3"}
{snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.10"}}}
]}.
{plugins, [{rebar3_proper, "0.12.1"}, rebar3_path_deps]}.

View File

@ -2,7 +2,7 @@
{application, emqx, [
{id, "emqx"},
{description, "EMQX Core"},
{vsn, "5.3.3"},
{vsn, "5.3.4"},
{modules, []},
{registered, []},
{applications, [

View File

@ -16,6 +16,8 @@
-module(emqx_banned).
-feature(maybe_expr, enable).
-behaviour(gen_server).
-behaviour(emqx_db_backup).
@ -49,6 +51,7 @@
handle_call/3,
handle_cast/2,
handle_info/2,
handle_continue/2,
terminate/2,
code_change/3
]).
@ -137,7 +140,7 @@ format(#banned{
until => to_rfc3339(Until)
}.
-spec parse(map()) -> emqx_types:banned() | {error, term()}.
-spec parse(map()) -> {ok, emqx_types:banned()} | {error, term()}.
parse(Params) ->
case parse_who(Params) of
{error, Reason} ->
@ -149,13 +152,13 @@ parse(Params) ->
Until = maps:get(<<"until">>, Params, At + ?EXPIRATION_TIME),
case Until > erlang:system_time(second) of
true ->
#banned{
{ok, #banned{
who = Who,
by = By,
reason = Reason,
at = At,
until = Until
};
}};
false ->
ErrorReason =
io_lib:format("Cannot create expired banned, ~p to ~p", [At, Until]),
@ -239,12 +242,139 @@ who(peerhost_net, CIDR) when is_tuple(CIDR) -> {peerhost_net, CIDR};
who(peerhost_net, CIDR) when is_binary(CIDR) ->
{peerhost_net, esockd_cidr:parse(binary_to_list(CIDR), true)}.
%%--------------------------------------------------------------------
%% Import From CSV
%%--------------------------------------------------------------------
init_from_csv(undefined) ->
ok;
init_from_csv(File) ->
maybe
core ?= mria_rlog:role(),
'$end_of_table' ?= mnesia:dirty_first(?BANNED_RULE_TAB),
'$end_of_table' ?= mnesia:dirty_first(?BANNED_INDIVIDUAL_TAB),
{ok, Bin} ?= file:read_file(File),
Stream = emqx_utils_stream:csv(Bin, #{nullable => true, filter_null => true}),
{ok, List} ?= parse_stream(Stream),
import_from_stream(List),
?SLOG(info, #{
msg => "load_banned_bootstrap_file_succeeded",
file => File
})
else
replicant ->
ok;
{Name, _} when
Name == peerhost;
Name == peerhost_net;
Name == clientid_re;
Name == username_re;
Name == clientid;
Name == username
->
ok;
{error, Reason} = Error ->
?SLOG(error, #{
msg => "load_banned_bootstrap_file_failed",
reason => Reason,
file => File
}),
Error
end.
import_from_stream(Stream) ->
Groups = maps:groups_from_list(
fun(#banned{who = Who}) -> table(Who) end, Stream
),
maps:foreach(
fun(Tab, Items) ->
Trans = fun() ->
lists:foreach(
fun(Item) ->
mnesia:write(Tab, Item, write)
end,
Items
)
end,
case trans(Trans) of
{ok, _} ->
?SLOG(info, #{
msg => "import_banned_from_stream_succeeded",
items => Items
});
{error, Reason} ->
?SLOG(error, #{
msg => "import_banned_from_stream_failed",
reason => Reason,
items => Items
})
end
end,
Groups
).
parse_stream(Stream) ->
try
List = emqx_utils_stream:consume(Stream),
parse_stream(List, [], [])
catch
error:Reason ->
{error, Reason}
end.
parse_stream([Item | List], Ok, Error) ->
maybe
{ok, Item1} ?= normalize_parse_item(Item),
{ok, Banned} ?= parse(Item1),
parse_stream(List, [Banned | Ok], Error)
else
{error, _} ->
parse_stream(List, Ok, [Item | Error])
end;
parse_stream([], Ok, []) ->
{ok, Ok};
parse_stream([], Ok, Error) ->
?SLOG(warning, #{
msg => "invalid_banned_items",
items => Error
}),
{ok, Ok}.
normalize_parse_item(#{<<"as">> := As} = Item) ->
ParseTime = fun(Name, Input) ->
maybe
#{Name := Time} ?= Input,
{ok, Epoch} ?= emqx_utils_calendar:to_epoch_second(emqx_utils_conv:str(Time)),
{ok, Input#{Name := Epoch}}
else
{error, _} = Error ->
Error;
NoTime when is_map(NoTime) ->
{ok, NoTime}
end
end,
maybe
{ok, Type} ?= emqx_utils:safe_to_existing_atom(As),
{ok, Item1} ?= ParseTime(<<"at">>, Item#{<<"as">> := Type}),
ParseTime(<<"until">>, Item1)
end;
normalize_parse_item(_Item) ->
{error, invalid_item}.
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([]) ->
{ok, ensure_expiry_timer(#{expiry_timer => undefined})}.
{ok, ensure_expiry_timer(#{expiry_timer => undefined}), {continue, init_from_csv}}.
handle_continue(init_from_csv, State) ->
File = emqx_schema:naive_env_interpolation(
emqx:get_config([banned, bootstrap_file], undefined)
),
_ = init_from_csv(File),
{noreply, State}.
handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", call => Req}),
@ -255,7 +385,7 @@ handle_cast(Msg, State) ->
{noreply, State}.
handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) ->
_ = mria:transaction(?COMMON_SHARD, fun ?MODULE:expire_banned_items/1, [
_ = trans(fun ?MODULE:expire_banned_items/1, [
erlang:system_time(second)
]),
{noreply, ensure_expiry_timer(State), hibernate};
@ -396,3 +526,15 @@ on_banned(_) ->
all_rules() ->
ets:tab2list(?BANNED_RULE_TAB).
trans(Fun) ->
case mria:transaction(?COMMON_SHARD, Fun) of
{atomic, Res} -> {ok, Res};
{aborted, Reason} -> {error, Reason}
end.
trans(Fun, Args) ->
case mria:transaction(?COMMON_SHARD, Fun, Args) of
{atomic, Res} -> {ok, Res};
{aborted, Reason} -> {error, Reason}
end.

View File

@ -146,7 +146,9 @@
-type replies() :: emqx_types:packet() | reply() | [reply()].
-define(IS_MQTT_V5, #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}).
-define(IS_CONNECTED_OR_REAUTHENTICATING(ConnState),
((ConnState == connected) orelse (ConnState == reauthenticating))
).
-define(IS_COMMON_SESSION_TIMER(N),
((N == retry_delivery) orelse (N == expire_awaiting_rel))
).
@ -235,7 +237,7 @@ caps(#channel{clientinfo = #{zone := Zone}}) ->
-spec init(emqx_types:conninfo(), opts()) -> channel().
init(
ConnInfo = #{
peername := {PeerHost, PeerPort},
peername := {PeerHost, PeerPort} = PeerName,
sockname := {_Host, SockPort}
},
#{
@ -259,6 +261,9 @@ init(
listener => ListenerId,
protocol => Protocol,
peerhost => PeerHost,
%% We copy peername to clientinfo because some event contexts only have access
%% to client info (e.g.: authn/authz).
peername => PeerName,
peerport => PeerPort,
sockport => SockPort,
clientid => undefined,
@ -334,7 +339,7 @@ take_conn_info_fields(Fields, ClientInfo, ConnInfo) ->
| {shutdown, Reason :: term(), channel()}
| {shutdown, Reason :: term(), replies(), channel()}.
handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = ConnState}) when
ConnState =:= connected orelse ConnState =:= reauthenticating
?IS_CONNECTED_OR_REAUTHENTICATING(ConnState)
->
handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel);
handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = connecting}) ->
@ -564,29 +569,8 @@ handle_in(
process_disconnect(ReasonCode, Properties, NChannel);
handle_in(?AUTH_PACKET(), Channel) ->
handle_out(disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, Channel);
handle_in({frame_error, Reason}, Channel = #channel{conn_state = idle}) ->
shutdown(shutdown_count(frame_error, Reason), Channel);
handle_in(
{frame_error, #{cause := frame_too_large} = R}, Channel = #channel{conn_state = connecting}
) ->
shutdown(
shutdown_count(frame_error, R), ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE), Channel
);
handle_in({frame_error, Reason}, Channel = #channel{conn_state = connecting}) ->
shutdown(shutdown_count(frame_error, Reason), ?CONNACK_PACKET(?RC_MALFORMED_PACKET), Channel);
handle_in(
{frame_error, #{cause := frame_too_large}}, Channel = #channel{conn_state = ConnState}
) when
ConnState =:= connected orelse ConnState =:= reauthenticating
->
handle_out(disconnect, {?RC_PACKET_TOO_LARGE, frame_too_large}, Channel);
handle_in({frame_error, Reason}, Channel = #channel{conn_state = ConnState}) when
ConnState =:= connected orelse ConnState =:= reauthenticating
->
handle_out(disconnect, {?RC_MALFORMED_PACKET, Reason}, Channel);
handle_in({frame_error, Reason}, Channel = #channel{conn_state = disconnected}) ->
?SLOG(error, #{msg => "malformed_mqtt_message", reason => Reason}),
{ok, Channel};
handle_in({frame_error, Reason}, Channel) ->
handle_frame_error(Reason, Channel);
handle_in(Packet, Channel) ->
?SLOG(error, #{msg => "disconnecting_due_to_unexpected_message", packet => Packet}),
handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel).
@ -1018,6 +1002,68 @@ not_nacked({deliver, _Topic, Msg}) ->
true
end.
%%--------------------------------------------------------------------
%% Handle Frame Error
%%--------------------------------------------------------------------
handle_frame_error(
Reason = #{cause := frame_too_large},
Channel = #channel{conn_state = ConnState, conninfo = ConnInfo}
) when
?IS_CONNECTED_OR_REAUTHENTICATING(ConnState)
->
ShutdownCount = shutdown_count(frame_error, Reason),
case proto_ver(Reason, ConnInfo) of
?MQTT_PROTO_V5 ->
handle_out(disconnect, {?RC_PACKET_TOO_LARGE, frame_too_large}, Channel);
_ ->
shutdown(ShutdownCount, Channel)
end;
%% Only send CONNACK with reason code `frame_too_large` for MQTT-v5.0 when connecting,
%% otherwise DONOT send any CONNACK or DISCONNECT packet.
handle_frame_error(
Reason,
Channel = #channel{conn_state = ConnState, conninfo = ConnInfo}
) when
is_map(Reason) andalso
(ConnState == idle orelse ConnState == connecting)
->
ShutdownCount = shutdown_count(frame_error, Reason),
ProtoVer = proto_ver(Reason, ConnInfo),
NChannel = Channel#channel{conninfo = ConnInfo#{proto_ver => ProtoVer}},
case ProtoVer of
?MQTT_PROTO_V5 ->
shutdown(ShutdownCount, ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE), NChannel);
_ ->
shutdown(ShutdownCount, NChannel)
end;
handle_frame_error(
Reason,
Channel = #channel{conn_state = connecting}
) ->
shutdown(
shutdown_count(frame_error, Reason),
?CONNACK_PACKET(?RC_MALFORMED_PACKET),
Channel
);
handle_frame_error(
Reason,
Channel = #channel{conn_state = ConnState}
) when
?IS_CONNECTED_OR_REAUTHENTICATING(ConnState)
->
handle_out(
disconnect,
{?RC_MALFORMED_PACKET, Reason},
Channel
);
handle_frame_error(
Reason,
Channel = #channel{conn_state = disconnected}
) ->
?SLOG(error, #{msg => "malformed_mqtt_message", reason => Reason}),
{ok, Channel}.
%%--------------------------------------------------------------------
%% Handle outgoing packet
%%--------------------------------------------------------------------
@ -1286,7 +1332,7 @@ handle_info(
session = Session
}
) when
ConnState =:= connected orelse ConnState =:= reauthenticating
?IS_CONNECTED_OR_REAUTHENTICATING(ConnState)
->
{Intent, Session1} = session_disconnect(ClientInfo, ConnInfo, Session),
Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(sock_closed, Channel)),
@ -2633,8 +2679,7 @@ save_alias(outbound, AliasId, Topic, TopicAliases = #{outbound := Aliases}) ->
NAliases = maps:put(Topic, AliasId, Aliases),
TopicAliases#{outbound => NAliases}.
-compile({inline, [reply/2, shutdown/2, shutdown/3, sp/1, flag/1]}).
-compile({inline, [reply/2, shutdown/2, shutdown/3]}).
reply(Reply, Channel) ->
{reply, Reply, Channel}.
@ -2670,13 +2715,13 @@ disconnect_and_shutdown(
?IS_MQTT_V5 =
#channel{conn_state = ConnState}
) when
ConnState =:= connected orelse ConnState =:= reauthenticating
?IS_CONNECTED_OR_REAUTHENTICATING(ConnState)
->
NChannel = ensure_disconnected(Reason, Channel),
shutdown(Reason, Reply, ?DISCONNECT_PACKET(reason_code(Reason)), NChannel);
%% mqtt v3/v4 connected sessions
disconnect_and_shutdown(Reason, Reply, Channel = #channel{conn_state = ConnState}) when
ConnState =:= connected orelse ConnState =:= reauthenticating
?IS_CONNECTED_OR_REAUTHENTICATING(ConnState)
->
NChannel = ensure_disconnected(Reason, Channel),
shutdown(Reason, Reply, NChannel);
@ -2719,6 +2764,13 @@ is_durable_session(#channel{session = Session}) ->
false
end.
proto_ver(#{proto_ver := ProtoVer}, _ConnInfo) ->
ProtoVer;
proto_ver(_Reason, #{proto_ver := ProtoVer}) ->
ProtoVer;
proto_ver(_, _) ->
?MQTT_PROTO_V4.
%%--------------------------------------------------------------------
%% For CT tests
%%--------------------------------------------------------------------

View File

@ -499,15 +499,14 @@ fill_defaults(RawConf, Opts) ->
).
-spec fill_defaults(module(), raw_config(), hocon_tconf:opts()) -> map().
fill_defaults(_SchemaMod, RawConf = #{<<"durable_storage">> := _}, _) ->
fill_defaults(SchemaMod, RawConf = #{<<"durable_storage">> := Ds}, Opts) ->
%% FIXME: kludge to prevent `emqx_config' module from filling in
%% the default values for backends and layouts. These records are
%% inside unions, and adding default values there will add
%% incompatible fields.
%%
%% Note: this function is called for each individual conf root, so
%% this clause only affects this particular subtree.
RawConf;
RawConf1 = maps:remove(<<"durable_storage">>, RawConf),
Conf = fill_defaults(SchemaMod, RawConf1, Opts),
Conf#{<<"durable_storage">> => Ds};
fill_defaults(SchemaMod, RawConf, Opts0) ->
Opts = maps:merge(#{required => false, make_serializable => true}, Opts0),
hocon_tconf:check_plain(

View File

@ -173,7 +173,9 @@
system_code_change/4
]}
).
-dialyzer({no_missing_calls, [handle_msg/2]}).
-ifndef(BUILD_WITHOUT_QUIC).
-spec start_link
(esockd:transport(), esockd:socket(), emqx_channel:opts()) ->
{ok, pid()};
@ -183,6 +185,9 @@
emqx_quic_connection:cb_state()
) ->
{ok, pid()}.
-else.
-spec start_link(esockd:transport(), esockd:socket(), emqx_channel:opts()) -> {ok, pid()}.
-endif.
start_link(Transport, Socket, Options) ->
Args = [self(), Transport, Socket, Options],
@ -468,19 +473,17 @@ cancel_stats_timer(State) ->
process_msg([], State) ->
{ok, State};
process_msg([Msg | More], State) ->
try
case handle_msg(Msg, State) of
ok ->
process_msg(More, State);
{ok, NState} ->
process_msg(More, NState);
{ok, Msgs, NState} ->
process_msg(append_msg(More, Msgs), NState);
{stop, Reason, NState} ->
{stop, Reason, NState};
{stop, Reason} ->
{stop, Reason, State}
end
try handle_msg(Msg, State) of
ok ->
process_msg(More, State);
{ok, NState} ->
process_msg(More, NState);
{ok, Msgs, NState} ->
process_msg(append_msg(More, Msgs), NState);
{stop, Reason, NState} ->
{stop, Reason, NState};
{stop, Reason} ->
{stop, Reason, State}
catch
exit:normal ->
{stop, normal, State};
@ -780,7 +783,8 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
input_bytes => Data,
parsed_packets => Packets
}),
{[{frame_error, Reason} | Packets], State};
NState = enrich_state(Reason, State),
{[{frame_error, Reason} | Packets], NState};
error:Reason:Stacktrace ->
?LOG(error, #{
at_state => emqx_frame:describe_state(ParseState),
@ -1224,6 +1228,12 @@ inc_counter(Key, Inc) ->
_ = emqx_pd:inc_counter(Key, Inc),
ok.
enrich_state(#{parse_state := NParseState}, State) ->
Serialize = emqx_frame:serialize_opts(NParseState),
State#state{parse_state = NParseState, serialize = Serialize};
enrich_state(_, State) ->
State.
set_tcp_keepalive({quic, _Listener}) ->
ok;
set_tcp_keepalive({Type, Id}) ->

View File

@ -234,6 +234,42 @@ fields(layout_builtin_wildcard_optimized) ->
}
)}
];
fields(layout_builtin_wildcard_optimized_v2) ->
[
{type,
sc(
wildcard_optimized_v2,
#{
'readOnly' => true,
default => wildcard_optimized_v2,
desc => ?DESC(layout_builtin_wildcard_optimized_type)
}
)},
{bytes_per_topic_level,
sc(
range(1, 16),
#{
default => 8,
importance => ?IMPORTANCE_HIDDEN
}
)},
{topic_index_bytes,
sc(
pos_integer(),
#{
default => 8,
importance => ?IMPORTANCE_HIDDEN
}
)},
{serialization_schema,
sc(
emqx_ds_msg_serializer:schema(),
#{
default => v1,
importance => ?IMPORTANCE_HIDDEN
}
)}
];
fields(layout_builtin_reference) ->
[
{type,
@ -242,6 +278,7 @@ fields(layout_builtin_reference) ->
#{
'readOnly' => true,
importance => ?IMPORTANCE_LOW,
default => reference,
desc => ?DESC(layout_builtin_reference_type)
}
)}
@ -284,7 +321,7 @@ common_builtin_fields() ->
importance => ?IMPORTANCE_MEDIUM,
default =>
#{
<<"type">> => wildcard_optimized
<<"type">> => wildcard_optimized_v2
}
}
)}
@ -298,6 +335,8 @@ desc(builtin_write_buffer) ->
?DESC(builtin_write_buffer);
desc(layout_builtin_wildcard_optimized) ->
?DESC(layout_builtin_wildcard_optimized);
desc(layout_builtin_wildcard_optimized_v2) ->
?DESC(layout_builtin_wildcard_optimized);
desc(layout_builtin_reference) ->
?DESC(layout_builtin_reference);
desc(_) ->
@ -307,6 +346,19 @@ desc(_) ->
%% Internal functions
%%================================================================================
translate_layout(
#{
type := wildcard_optimized_v2,
bytes_per_topic_level := BytesPerTopicLevel,
topic_index_bytes := TopicIndexBytes,
serialization_schema := SSchema
}
) ->
{emqx_ds_storage_skipstream_lts, #{
wildcard_hash_bytes => BytesPerTopicLevel,
topic_index_bytes => TopicIndexBytes,
serialization_schema => SSchema
}};
translate_layout(
#{
type := wildcard_optimized,
@ -336,7 +388,11 @@ builtin_layouts() ->
%% suitable for production use. However, it's very simple and
%% produces a very predictabale replay order, which can be useful
%% for testing and debugging:
[ref(layout_builtin_wildcard_optimized), ref(layout_builtin_reference)].
[
ref(layout_builtin_wildcard_optimized_v2),
ref(layout_builtin_wildcard_optimized),
ref(layout_builtin_reference)
].
sc(Type, Meta) -> hoconsc:mk(Type, Meta).

View File

@ -117,6 +117,13 @@ try_subscribe(ClientId, Topic) ->
write
),
allow;
[#exclusive_subscription{clientid = ClientId, topic = Topic}] ->
%% Fixed the issue-13476
%% In this feature, the user must manually call `unsubscribe` to release the lock,
%% but sometimes the node may go down for some reason,
%% then the client will reconnect to this node and resubscribe.
%% We need to allow resubscription, otherwise the lock will never be released.
allow;
[_] ->
deny
end.

View File

@ -43,7 +43,9 @@
add_shared_route/2,
delete_shared_route/2,
add_persistent_route/2,
delete_persistent_route/2
delete_persistent_route/2,
add_persistent_shared_route/3,
delete_persistent_shared_route/3
]).
-export_type([dest/0]).
@ -129,6 +131,12 @@ add_persistent_route(Topic, ID) ->
delete_persistent_route(Topic, ID) ->
?safe_with_provider(?FUNCTION_NAME(Topic, ID), ok).
add_persistent_shared_route(Topic, Group, ID) ->
?safe_with_provider(?FUNCTION_NAME(Topic, Group, ID), ok).
delete_persistent_shared_route(Topic, Group, ID) ->
?safe_with_provider(?FUNCTION_NAME(Topic, Group, ID), ok).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------

View File

@ -267,28 +267,50 @@ packet(Header, Variable) ->
packet(Header, Variable, Payload) ->
#mqtt_packet{header = Header, variable = Variable, payload = Payload}.
parse_connect(FrameBin, StrictMode) ->
{ProtoName, Rest} = parse_utf8_string_with_cause(FrameBin, StrictMode, invalid_proto_name),
case ProtoName of
<<"MQTT">> ->
ok;
<<"MQIsdp">> ->
ok;
_ ->
%% from spec: the server MAY send disconnect with reason code 0x84
%% we chose to close socket because the client is likely not talking MQTT anyway
?PARSE_ERR(#{
cause => invalid_proto_name,
expected => <<"'MQTT' or 'MQIsdp'">>,
received => ProtoName
})
end,
parse_connect2(ProtoName, Rest, StrictMode).
parse_connect(FrameBin, Options = #{strict_mode := StrictMode}) ->
{ProtoName, Rest0} = parse_utf8_string_with_cause(FrameBin, StrictMode, invalid_proto_name),
%% No need to parse and check proto_ver if proto_name is invalid, check it first
%% And the matching check of `proto_name` and `proto_ver` fields will be done in `emqx_packet:check_proto_ver/2`
_ = validate_proto_name(ProtoName),
{IsBridge, ProtoVer, Rest2} = parse_connect_proto_ver(Rest0),
NOptions = Options#{version => ProtoVer},
try
do_parse_connect(ProtoName, IsBridge, ProtoVer, Rest2, StrictMode)
catch
throw:{?FRAME_PARSE_ERROR, ReasonM} when is_map(ReasonM) ->
?PARSE_ERR(
ReasonM#{
proto_ver => ProtoVer,
proto_name => ProtoName,
parse_state => ?NONE(NOptions)
}
);
throw:{?FRAME_PARSE_ERROR, Reason} ->
?PARSE_ERR(
#{
cause => Reason,
proto_ver => ProtoVer,
proto_name => ProtoName,
parse_state => ?NONE(NOptions)
}
)
end.
parse_connect2(
do_parse_connect(
ProtoName,
<<BridgeTag:4, ProtoVer:4, UsernameFlagB:1, PasswordFlagB:1, WillRetainB:1, WillQoS:2,
WillFlagB:1, CleanStart:1, Reserved:1, KeepAlive:16/big, Rest2/binary>>,
IsBridge,
ProtoVer,
<<
UsernameFlagB:1,
PasswordFlagB:1,
WillRetainB:1,
WillQoS:2,
WillFlagB:1,
CleanStart:1,
Reserved:1,
KeepAlive:16/big,
Rest/binary
>>,
StrictMode
) ->
_ = validate_connect_reserved(Reserved),
@ -303,14 +325,14 @@ parse_connect2(
UsernameFlag = bool(UsernameFlagB),
PasswordFlag = bool(PasswordFlagB)
),
{Properties, Rest3} = parse_properties(Rest2, ProtoVer, StrictMode),
{Properties, Rest3} = parse_properties(Rest, ProtoVer, StrictMode),
{ClientId, Rest4} = parse_utf8_string_with_cause(Rest3, StrictMode, invalid_clientid),
ConnPacket = #mqtt_packet_connect{
proto_name = ProtoName,
proto_ver = ProtoVer,
%% For bridge mode, non-standard implementation
%% Invented by mosquitto, named 'try_private': https://mosquitto.org/man/mosquitto-conf-5.html
is_bridge = (BridgeTag =:= 8),
is_bridge = IsBridge,
clean_start = bool(CleanStart),
will_flag = WillFlag,
will_qos = WillQoS,
@ -343,16 +365,16 @@ parse_connect2(
unexpected_trailing_bytes => size(Rest7)
})
end;
parse_connect2(_ProtoName, Bin, _StrictMode) ->
%% sent less than 32 bytes
do_parse_connect(_ProtoName, _IsBridge, _ProtoVer, Bin, _StrictMode) ->
%% sent less than 24 bytes
?PARSE_ERR(#{cause => malformed_connect, header_bytes => Bin}).
parse_packet(
#mqtt_packet_header{type = ?CONNECT},
FrameBin,
#{strict_mode := StrictMode}
Options
) ->
parse_connect(FrameBin, StrictMode);
parse_connect(FrameBin, Options);
parse_packet(
#mqtt_packet_header{type = ?CONNACK},
<<AckFlags:8, ReasonCode:8, Rest/binary>>,
@ -516,6 +538,12 @@ parse_packet_id(<<PacketId:16/big, Rest/binary>>) ->
parse_packet_id(_) ->
?PARSE_ERR(invalid_packet_id).
parse_connect_proto_ver(<<BridgeTag:4, ProtoVer:4, Rest/binary>>) ->
{_IsBridge = (BridgeTag =:= 8), ProtoVer, Rest};
parse_connect_proto_ver(Bin) ->
%% sent less than 1 bytes or empty
?PARSE_ERR(#{cause => malformed_connect, header_bytes => Bin}).
parse_properties(Bin, Ver, _StrictMode) when Ver =/= ?MQTT_PROTO_V5 ->
{#{}, Bin};
%% TODO: version mess?
@ -739,6 +767,8 @@ serialize_fun(#{version := Ver, max_size := MaxSize, strict_mode := StrictMode})
initial_serialize_opts(Opts) ->
maps:merge(?DEFAULT_OPTIONS, Opts).
serialize_opts(?NONE(Options)) ->
maps:merge(?DEFAULT_OPTIONS, Options);
serialize_opts(#mqtt_packet_connect{proto_ver = ProtoVer, properties = ConnProps}) ->
MaxSize = get_property('Maximum-Packet-Size', ConnProps, ?MAX_PACKET_SIZE),
#{version => ProtoVer, max_size => MaxSize, strict_mode => false}.
@ -1157,18 +1187,34 @@ validate_subqos([3 | _]) -> ?PARSE_ERR(bad_subqos);
validate_subqos([_ | T]) -> validate_subqos(T);
validate_subqos([]) -> ok.
%% from spec: the server MAY send disconnect with reason code 0x84
%% we chose to close socket because the client is likely not talking MQTT anyway
validate_proto_name(<<"MQTT">>) ->
ok;
validate_proto_name(<<"MQIsdp">>) ->
ok;
validate_proto_name(ProtoName) ->
?PARSE_ERR(#{
cause => invalid_proto_name,
expected => <<"'MQTT' or 'MQIsdp'">>,
received => ProtoName
}).
%% MQTT-v3.1.1-[MQTT-3.1.2-3], MQTT-v5.0-[MQTT-3.1.2-3]
-compile({inline, [validate_connect_reserved/1]}).
validate_connect_reserved(0) -> ok;
validate_connect_reserved(1) -> ?PARSE_ERR(reserved_connect_flag).
-compile({inline, [validate_connect_will/3]}).
%% MQTT-v3.1.1-[MQTT-3.1.2-13], MQTT-v5.0-[MQTT-3.1.2-11]
validate_connect_will(false, _, WillQos) when WillQos > 0 -> ?PARSE_ERR(invalid_will_qos);
validate_connect_will(false, _, WillQoS) when WillQoS > 0 -> ?PARSE_ERR(invalid_will_qos);
%% MQTT-v3.1.1-[MQTT-3.1.2-14], MQTT-v5.0-[MQTT-3.1.2-12]
validate_connect_will(true, _, WillQoS) when WillQoS > 2 -> ?PARSE_ERR(invalid_will_qos);
%% MQTT-v3.1.1-[MQTT-3.1.2-15], MQTT-v5.0-[MQTT-3.1.2-13]
validate_connect_will(false, WillRetain, _) when WillRetain -> ?PARSE_ERR(invalid_will_retain);
validate_connect_will(_, _, _) -> ok.
-compile({inline, [validate_connect_password_flag/4]}).
%% MQTT-v3.1
%% Username flag and password flag are not strongly related
%% https://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#connect
@ -1183,6 +1229,7 @@ validate_connect_password_flag(true, ?MQTT_PROTO_V5, _, _) ->
validate_connect_password_flag(_, _, _, _) ->
ok.
-compile({inline, [bool/1]}).
bool(0) -> false;
bool(1) -> true.

View File

@ -212,16 +212,29 @@ short_paths_fields() ->
short_paths_fields(Importance) ->
[
{Name,
?HOCON(rate_type(), #{
desc => ?DESC(Name),
required => false,
importance => Importance,
example => Example
})}
?HOCON(
rate_type(),
maps:merge(
#{
desc => ?DESC(Name),
required => false,
importance => Importance,
example => Example
},
short_paths_fields_extra(Name)
)
)}
|| {Name, Example} <-
lists:zip(short_paths(), [<<"1000/s">>, <<"1000/s">>, <<"100MB/s">>])
].
short_paths_fields_extra(max_conn_rate) ->
#{
default => infinity
};
short_paths_fields_extra(_Name) ->
#{}.
desc(limiter) ->
"Settings for the rate limiter.";
desc(node_opts) ->

View File

@ -64,6 +64,17 @@
-export_type([listener_id/0]).
-dialyzer(
{no_unknown, [
is_running/3,
current_conns/3,
do_stop_listener/3,
do_start_listener/4,
do_update_listener/4,
quic_listener_conf_rollback/3
]}
).
-type listener_id() :: atom() | binary().
-type listener_type() :: tcp | ssl | ws | wss | quic | dtls.
@ -421,7 +432,7 @@ do_start_listener(Type, Name, Id, #{bind := ListenOn} = Opts) when ?ESOCKD_LISTE
esockd:open(
Id,
ListenOn,
merge_default(esockd_opts(Id, Type, Name, Opts))
merge_default(esockd_opts(Id, Type, Name, Opts, _OldOpts = undefined))
);
%% Start MQTT/WS listener
do_start_listener(Type, Name, Id, Opts) when ?COWBOY_LISTENER(Type) ->
@ -465,7 +476,7 @@ do_update_listener(Type, Name, OldConf, NewConf = #{bind := ListenOn}) when
Id = listener_id(Type, Name),
case maps:get(bind, OldConf) of
ListenOn ->
esockd:set_options({Id, ListenOn}, esockd_opts(Id, Type, Name, NewConf));
esockd:set_options({Id, ListenOn}, esockd_opts(Id, Type, Name, NewConf, OldConf));
_Different ->
%% TODO
%% Again, we're not strictly required to drop live connections in this case.
@ -577,7 +588,7 @@ perform_listener_change(update, {{Type, Name, ConfOld}, {_, _, ConfNew}}) ->
perform_listener_change(stop, {Type, Name, Conf}) ->
stop_listener(Type, Name, Conf).
esockd_opts(ListenerId, Type, Name, Opts0) ->
esockd_opts(ListenerId, Type, Name, Opts0, OldOpts) ->
Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0),
Limiter = limiter(Opts0),
Opts2 =
@ -609,7 +620,7 @@ esockd_opts(ListenerId, Type, Name, Opts0) ->
tcp ->
Opts3#{tcp_options => tcp_opts(Opts0)};
ssl ->
OptsWithCRL = inject_crl_config(Opts0),
OptsWithCRL = inject_crl_config(Opts0, OldOpts),
OptsWithSNI = inject_sni_fun(ListenerId, OptsWithCRL),
OptsWithRootFun = inject_root_fun(OptsWithSNI),
OptsWithVerifyFun = inject_verify_fun(OptsWithRootFun),
@ -985,7 +996,7 @@ inject_sni_fun(_ListenerId, Conf) ->
Conf.
inject_crl_config(
Conf = #{ssl_options := #{enable_crl_check := true} = SSLOpts}
Conf = #{ssl_options := #{enable_crl_check := true} = SSLOpts}, _OldOpts
) ->
HTTPTimeout = emqx_config:get([crl_cache, http_timeout], timer:seconds(15)),
Conf#{
@ -995,7 +1006,16 @@ inject_crl_config(
crl_cache => {emqx_ssl_crl_cache, {internal, [{http, HTTPTimeout}]}}
}
};
inject_crl_config(Conf) ->
inject_crl_config(#{ssl_options := SSLOpts0} = Conf0, #{} = OldOpts) ->
%% Note: we must set crl options to `undefined' to unset them. Otherwise,
%% `esockd' will retain such options when `esockd:merge_opts/2' is called and the SSL
%% options were previously enabled.
WasEnabled = emqx_utils_maps:deep_get([ssl_options, enable_crl_check], OldOpts, false),
Undefine = fun(Acc, K) -> emqx_utils_maps:put_if(Acc, K, undefined, WasEnabled) end,
SSLOpts1 = Undefine(SSLOpts0, crl_check),
SSLOpts = Undefine(SSLOpts1, crl_cache),
Conf0#{ssl_options := SSLOpts};
inject_crl_config(Conf, undefined = _OldOpts) ->
Conf.
maybe_unregister_ocsp_stapling_refresh(
@ -1018,7 +1038,6 @@ ensure_max_conns(<<"infinity">>) -> <<"infinity">>;
ensure_max_conns(MaxConn) when is_binary(MaxConn) -> binary_to_integer(MaxConn);
ensure_max_conns(MaxConn) -> MaxConn.
-spec quic_listen_on(X :: any()) -> quicer:listen_on().
quic_listen_on(Bind) ->
case Bind of
{Addr, Port} when tuple_size(Addr) == 4 ->

View File

@ -25,7 +25,7 @@
-export([start_link/0]).
%% throttler API
-export([allow/1]).
-export([allow/2]).
%% gen_server callbacks
-export([
@ -40,23 +40,29 @@
-define(SEQ_ID(Msg), {?MODULE, Msg}).
-define(NEW_SEQ, atomics:new(1, [{signed, false}])).
-define(GET_SEQ(Msg), persistent_term:get(?SEQ_ID(Msg), undefined)).
-define(ERASE_SEQ(Msg), persistent_term:erase(?SEQ_ID(Msg))).
-define(RESET_SEQ(SeqRef), atomics:put(SeqRef, 1, 0)).
-define(INC_SEQ(SeqRef), atomics:add(SeqRef, 1, 1)).
-define(GET_DROPPED(SeqRef), atomics:get(SeqRef, 1) - 1).
-define(IS_ALLOWED(SeqRef), atomics:add_get(SeqRef, 1, 1) =:= 1).
-define(NEW_THROTTLE(Msg, SeqRef), persistent_term:put(?SEQ_ID(Msg), SeqRef)).
-define(MSGS_LIST, emqx:get_config([log, throttling, msgs], [])).
-define(TIME_WINDOW_MS, timer:seconds(emqx:get_config([log, throttling, time_window], 60))).
-spec allow(atom()) -> boolean().
allow(Msg) when is_atom(Msg) ->
%% @doc Check if a throttled log message is allowed to pass down to the logger this time.
%% The Msg has to be an atom, and the second argument `UniqueKey' should be `undefined'
%% for predefined message IDs.
%% For relatively static resources created from configurations such as data integration
%% resource IDs `UniqueKey' should be of `binary()' type.
-spec allow(atom(), undefined | binary()) -> boolean().
allow(Msg, UniqueKey) when
is_atom(Msg) andalso (is_binary(UniqueKey) orelse UniqueKey =:= undefined)
->
case emqx_logger:get_primary_log_level() of
debug ->
true;
_ ->
do_allow(Msg)
do_allow(Msg, UniqueKey)
end.
-spec start_link() -> startlink_ret().
@ -68,7 +74,8 @@ start_link() ->
%%--------------------------------------------------------------------
init([]) ->
ok = lists:foreach(fun(Msg) -> ?NEW_THROTTLE(Msg, ?NEW_SEQ) end, ?MSGS_LIST),
process_flag(trap_exit, true),
ok = lists:foreach(fun new_throttler/1, ?MSGS_LIST),
CurrentPeriodMs = ?TIME_WINDOW_MS,
TimerRef = schedule_refresh(CurrentPeriodMs),
{ok, #{timer_ref => TimerRef, current_period_ms => CurrentPeriodMs}}.
@ -86,16 +93,22 @@ handle_info(refresh, #{current_period_ms := PeriodMs} = State) ->
DroppedStats = lists:foldl(
fun(Msg, Acc) ->
case ?GET_SEQ(Msg) of
%% Should not happen, unless the static ids list is updated at run-time.
undefined ->
?NEW_THROTTLE(Msg, ?NEW_SEQ),
%% Should not happen, unless the static ids list is updated at run-time.
new_throttler(Msg),
?tp(log_throttler_new_msg, #{throttled_msg => Msg}),
Acc;
SeqMap when is_map(SeqMap) ->
maps:fold(
fun(Key, Ref, Acc0) ->
ID = iolist_to_binary([atom_to_binary(Msg), $:, Key]),
drop_stats(Ref, ID, Acc0)
end,
Acc,
SeqMap
);
SeqRef ->
Dropped = ?GET_DROPPED(SeqRef),
ok = ?RESET_SEQ(SeqRef),
?tp(log_throttler_dropped, #{dropped_count => Dropped, throttled_msg => Msg}),
maybe_add_dropped(Msg, Dropped, Acc)
drop_stats(SeqRef, Msg, Acc)
end
end,
#{},
@ -112,7 +125,16 @@ handle_info(Info, State) ->
?SLOG(error, #{msg => "unxpected_info", info => Info}),
{noreply, State}.
drop_stats(SeqRef, Msg, Acc) ->
Dropped = ?GET_DROPPED(SeqRef),
ok = ?RESET_SEQ(SeqRef),
?tp(log_throttler_dropped, #{dropped_count => Dropped, throttled_msg => Msg}),
maybe_add_dropped(Msg, Dropped, Acc).
terminate(_Reason, _State) ->
%% atomics do not have delete/remove/release/deallocate API
%% after the reference is garbage-collected the resource is released
lists:foreach(fun(Msg) -> ?ERASE_SEQ(Msg) end, ?MSGS_LIST),
ok.
code_change(_OldVsn, State, _Extra) ->
@ -122,17 +144,27 @@ code_change(_OldVsn, State, _Extra) ->
%% internal functions
%%--------------------------------------------------------------------
do_allow(Msg) ->
do_allow(Msg, UniqueKey) ->
case persistent_term:get(?SEQ_ID(Msg), undefined) of
undefined ->
%% This is either a race condition (emqx_log_throttler is not started yet)
%% or a developer mistake (msg used in ?SLOG_THROTTLE/2,3 macro is
%% not added to the default value of `log.throttling.msgs`.
?SLOG(info, #{
msg => "missing_log_throttle_sequence",
?SLOG(debug, #{
msg => "log_throttle_disabled",
throttled_msg => Msg
}),
true;
%% e.g: unrecoverable msg throttle according resource_id
SeqMap when is_map(SeqMap) ->
case maps:find(UniqueKey, SeqMap) of
{ok, SeqRef} ->
?IS_ALLOWED(SeqRef);
error ->
SeqRef = ?NEW_SEQ,
new_throttler(Msg, SeqMap#{UniqueKey => SeqRef}),
true
end;
SeqRef ->
?IS_ALLOWED(SeqRef)
end.
@ -154,3 +186,11 @@ maybe_log_dropped(_DroppedStats, _PeriodMs) ->
schedule_refresh(PeriodMs) ->
?tp(log_throttler_sched_refresh, #{new_period_ms => PeriodMs}),
erlang:send_after(PeriodMs, ?MODULE, refresh).
new_throttler(unrecoverable_resource_error = Msg) ->
new_throttler(Msg, #{});
new_throttler(Msg) ->
new_throttler(Msg, ?NEW_SEQ).
new_throttler(Msg, AtomicOrEmptyMap) ->
persistent_term:put(?SEQ_ID(Msg), AtomicOrEmptyMap).

View File

@ -105,7 +105,7 @@ format(Msg, Meta, Config) ->
maybe_format_msg(undefined, _Meta, _Config) ->
#{};
maybe_format_msg({report, Report0} = Msg, #{report_cb := Cb} = Meta, Config) ->
Report = emqx_logger_textfmt:try_encode_payload(Report0, Config),
Report = emqx_logger_textfmt:try_encode_meta(Report0, Config),
case is_map(Report) andalso Cb =:= ?DEFAULT_FORMATTER of
true ->
%% reporting a map without a customised format function

View File

@ -20,7 +20,7 @@
-export([format/2]).
-export([check_config/1]).
-export([try_format_unicode/1, try_encode_payload/2]).
-export([try_format_unicode/1, try_encode_meta/2]).
%% Used in the other log formatters
-export([evaluate_lazy_values_if_dbg_level/1, evaluate_lazy_values/1]).
@ -111,7 +111,7 @@ is_list_report_acceptable(_) ->
enrich_report(ReportRaw0, Meta, Config) ->
%% clientid and peername always in emqx_conn's process metadata.
%% topic and username can be put in meta using ?SLOG/3, or put in msg's report by ?SLOG/2
ReportRaw = try_encode_payload(ReportRaw0, Config),
ReportRaw = try_encode_meta(ReportRaw0, Config),
Topic =
case maps:get(topic, Meta, undefined) of
undefined -> maps:get(topic, ReportRaw, undefined);
@ -180,9 +180,22 @@ enrich_topic({Fmt, Args}, #{topic := Topic}) when is_list(Fmt) ->
enrich_topic(Msg, _) ->
Msg.
try_encode_payload(#{payload := Payload} = Report, #{payload_encode := Encode}) ->
try_encode_meta(Report, Config) ->
lists:foldl(
fun(Meta, Acc) ->
try_encode_meta(Meta, Acc, Config)
end,
Report,
[payload, packet]
).
try_encode_meta(payload, #{payload := Payload} = Report, #{payload_encode := Encode}) ->
Report#{payload := encode_payload(Payload, Encode)};
try_encode_payload(Report, _Config) ->
try_encode_meta(packet, #{packet := Packet} = Report, #{payload_encode := Encode}) when
is_tuple(Packet)
->
Report#{packet := emqx_packet:format(Packet, Encode)};
try_encode_meta(_, Report, _Config) ->
Report.
encode_payload(Payload, text) ->
@ -190,4 +203,5 @@ encode_payload(Payload, text) ->
encode_payload(_Payload, hidden) ->
"******";
encode_payload(Payload, hex) ->
binary:encode_hex(Payload).
Bin = emqx_utils_conv:bin(Payload),
binary:encode_hex(Bin).

View File

@ -51,7 +51,6 @@
]).
-export([
format/1,
format/2
]).
@ -481,10 +480,6 @@ will_msg(#mqtt_packet_connect{
headers = #{username => Username, properties => Props}
}.
%% @doc Format packet
-spec format(emqx_types:packet()) -> iolist().
format(Packet) -> format(Packet, emqx_trace_handler:payload_encode()).
%% @doc Format packet
-spec format(emqx_types:packet(), hex | text | hidden) -> iolist().
format(#mqtt_packet{header = Header, variable = Variable, payload = Payload}, PayloadEncode) ->

View File

@ -102,7 +102,11 @@ hash({SimpleHash, _Salt, disable}, Password) when is_binary(Password) ->
hash({SimpleHash, Salt, prefix}, Password) when is_binary(Password), is_binary(Salt) ->
hash_data(SimpleHash, <<Salt/binary, Password/binary>>);
hash({SimpleHash, Salt, suffix}, Password) when is_binary(Password), is_binary(Salt) ->
hash_data(SimpleHash, <<Password/binary, Salt/binary>>).
hash_data(SimpleHash, <<Password/binary, Salt/binary>>);
hash({_SimpleHash, Salt, _SaltPos}, _Password) when not is_binary(Salt) ->
error({salt_not_string, Salt});
hash({_SimpleHash, _Salt, _SaltPos}, Password) when not is_binary(Password) ->
error({password_not_string, Password}).
-spec hash_data(hash_type(), binary()) -> binary().
hash_data(plain, Data) when is_binary(Data) ->

View File

@ -621,9 +621,13 @@ handle_timeout(ClientInfo, ?TIMER_RETRY_REPLAY, Session0) ->
Session = replay_streams(Session0, ClientInfo),
{ok, [], Session};
handle_timeout(ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0, shared_sub_s := SharedSubS0}) ->
S1 = emqx_persistent_session_ds_subs:gc(S0),
S2 = emqx_persistent_session_ds_stream_scheduler:renew_streams(S1),
{S, SharedSubS} = emqx_persistent_session_ds_shared_subs:renew_streams(S2, SharedSubS0),
%% `gc` and `renew_streams` methods may drop unsubscribed streams.
%% Shared subscription handler must have a chance to see unsubscribed streams
%% in the fully replayed state.
{S1, SharedSubS1} = emqx_persistent_session_ds_shared_subs:pre_renew_streams(S0, SharedSubS0),
S2 = emqx_persistent_session_ds_subs:gc(S1),
S3 = emqx_persistent_session_ds_stream_scheduler:renew_streams(S2),
{S, SharedSubS} = emqx_persistent_session_ds_shared_subs:renew_streams(S3, SharedSubS1),
Interval = get_config(ClientInfo, [renew_streams_interval]),
Session = emqx_session:ensure_timer(
?TIMER_GET_STREAMS,
@ -757,7 +761,7 @@ skip_batch(StreamKey, SRS0, Session = #{s := S0}, ClientInfo, Reason) ->
%%--------------------------------------------------------------------
-spec disconnect(session(), emqx_types:conninfo()) -> {shutdown, session()}.
disconnect(Session = #{id := Id, s := S0}, ConnInfo) ->
disconnect(Session = #{id := Id, s := S0, shared_sub_s := SharedSubS0}, ConnInfo) ->
S1 = maybe_set_offline_info(S0, Id),
S2 = emqx_persistent_session_ds_state:set_last_alive_at(now_ms(), S1),
S3 =
@ -767,8 +771,9 @@ disconnect(Session = #{id := Id, s := S0}, ConnInfo) ->
_ ->
S2
end,
S = emqx_persistent_session_ds_state:commit(S3),
{shutdown, Session#{s => S}}.
{S4, SharedSubS} = emqx_persistent_session_ds_shared_subs:on_disconnect(S3, SharedSubS0),
S = emqx_persistent_session_ds_state:commit(S4),
{shutdown, Session#{s => S, shared_sub_s => SharedSubS}}.
-spec terminate(Reason :: term(), session()) -> ok.
terminate(_Reason, Session = #{id := Id, s := S}) ->
@ -816,10 +821,12 @@ list_client_subscriptions(ClientId) ->
{error, not_found}
end.
-spec get_client_subscription(emqx_types:clientid(), emqx_types:topic()) ->
-spec get_client_subscription(emqx_types:clientid(), topic_filter() | share_topic_filter()) ->
subscription() | undefined.
get_client_subscription(ClientId, Topic) ->
emqx_persistent_session_ds_subs:cold_get_subscription(ClientId, Topic).
get_client_subscription(ClientId, #share{} = ShareTopicFilter) ->
emqx_persistent_session_ds_shared_subs:cold_get_subscription(ClientId, ShareTopicFilter);
get_client_subscription(ClientId, TopicFilter) ->
emqx_persistent_session_ds_subs:cold_get_subscription(ClientId, TopicFilter).
%%--------------------------------------------------------------------
%% Session tables operations
@ -986,14 +993,14 @@ do_ensure_all_iterators_closed(_DSSessionID) ->
%% Normal replay:
%%--------------------------------------------------------------------
fetch_new_messages(Session0 = #{s := S0}, ClientInfo) ->
LFS = maps:get(last_fetched_stream, Session0, beginning),
ItStream = emqx_persistent_session_ds_stream_scheduler:iter_next_streams(LFS, S0),
fetch_new_messages(Session0 = #{s := S0, shared_sub_s := SharedSubS0}, ClientInfo) ->
{S1, SharedSubS1} = emqx_persistent_session_ds_shared_subs:on_streams_replay(S0, SharedSubS0),
Session1 = Session0#{s => S1, shared_sub_s => SharedSubS1},
LFS = maps:get(last_fetched_stream, Session1, beginning),
ItStream = emqx_persistent_session_ds_stream_scheduler:iter_next_streams(LFS, S1),
BatchSize = get_config(ClientInfo, [batch_size]),
Session1 = fetch_new_messages(ItStream, BatchSize, Session0, ClientInfo),
#{s := S1, shared_sub_s := SharedSubS0} = Session1,
{S2, SharedSubS1} = emqx_persistent_session_ds_shared_subs:on_streams_replayed(S1, SharedSubS0),
Session1#{s => S2, shared_sub_s => SharedSubS1}.
Session2 = fetch_new_messages(ItStream, BatchSize, Session1, ClientInfo),
Session2#{shared_sub_s => SharedSubS1}.
fetch_new_messages(ItStream0, BatchSize, Session0, ClientInfo) ->
#{inflight := Inflight} = Session0,

View File

@ -17,7 +17,7 @@
-module(emqx_persistent_session_ds_router).
-include("emqx.hrl").
-include("emqx_persistent_session_ds/emqx_ps_ds_int.hrl").
-include("emqx_ps_ds_int.hrl").
-export([init_tables/0]).
@ -47,7 +47,7 @@
-endif.
-type route() :: #ps_route{}.
-type dest() :: emqx_persistent_session_ds:id().
-type dest() :: emqx_persistent_session_ds:id() | #share_dest{}.
-export_type([dest/0, route/0]).
@ -161,7 +161,7 @@ topics() ->
print_routes(Topic) ->
lists:foreach(
fun(#ps_route{topic = To, dest = Dest}) ->
io:format("~ts -> ~ts~n", [To, Dest])
io:format("~ts -> ~tp~n", [To, Dest])
end,
match_routes(Topic)
).
@ -247,6 +247,8 @@ mk_filtertab_fold_fun(FoldFun) ->
match_filters(Topic) ->
emqx_topic_index:matches(Topic, ?PS_FILTERS_TAB, []).
get_dest_session_id(#share_dest{session_id = DSSessionId}) ->
DSSessionId;
get_dest_session_id({_, DSSessionId}) ->
DSSessionId;
get_dest_session_id(DSSessionId) ->

View File

@ -2,11 +2,37 @@
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
%% @doc This module
%% * handles creation and management of _shared_ subscriptions for the session;
%% * provides streams to the session;
%% * handles progress of stream replay.
%%
%% The logic is quite straightforward; most of the parts resemble the logic of the
%% `emqx_persistent_session_ds_subs` (subscribe/unsubscribe) and
%% `emqx_persistent_session_ds_scheduler` (providing new streams),
%% but some data is sent or received from the `emqx_persistent_session_ds_shared_subs_agent`
%% which communicates with remote shared subscription leaders.
%%
%% A tricky part is the concept of "scheduled actions". When we unsubscribe from a topic
%% we may have some streams that have unacked messages. So we do not have a reliable
%% progress for them. Sending the current progress to the leader and disconnecting
%% will lead to the duplication of messages. So after unsubscription, we need to wait
%% some time until all streams are acked, and only then we disconnect from the leader.
%%
%% For this purpose we have the `scheduled_actions` map in the state of the module.
%% We preserve there the streams that we need to wait for and collect their progress.
%% We also use `scheduled_actions` for resubscriptions. If a client quickly resubscribes
%% after unsubscription, we may still have the mentioned streams unacked. If we abandon
%% them, just connect to the leader, then it may lease us the same streams again, but with
%% the previous progress. So messages may duplicate.
-module(emqx_persistent_session_ds_shared_subs).
-include("emqx_mqtt.hrl").
-include("emqx.hrl").
-include("logger.hrl").
-include("session_internals.hrl").
-include_lib("snabbkaffe/include/trace.hrl").
-export([
@ -15,16 +41,56 @@
on_subscribe/3,
on_unsubscribe/4,
on_disconnect/2,
on_streams_replayed/2,
on_streams_replay/2,
on_info/3,
pre_renew_streams/2,
renew_streams/2,
to_map/2
]).
%% Management API:
-export([
cold_get_subscription/2
]).
-export([
format_lease_events/1,
format_stream_progresses/1
]).
-define(schedule_subscribe, schedule_subscribe).
-define(schedule_unsubscribe, schedule_unsubscribe).
-type stream_key() :: {emqx_persistent_session_ds:id(), emqx_ds:stream()}.
-type scheduled_action_type() ::
{?schedule_subscribe, emqx_types:subopts()} | ?schedule_unsubscribe.
-type agent_stream_progress() :: #{
stream := emqx_ds:stream(),
progress := progress(),
use_finished := boolean()
}.
-type progress() ::
#{
iterator := emqx_ds:iterator()
}.
-type scheduled_action() :: #{
type := scheduled_action_type(),
stream_keys_to_wait := [stream_key()],
progresses := [agent_stream_progress()]
}.
-type t() :: #{
agent := emqx_persistent_session_ds_shared_subs_agent:t()
agent := emqx_persistent_session_ds_shared_subs_agent:t(),
scheduled_actions := #{
share_topic_filter() => scheduled_action()
}
}.
-type share_topic_filter() :: emqx_persistent_session_ds:share_topic_filter().
-type opts() :: #{
@ -34,184 +100,90 @@
-define(rank_x, rank_shared).
-define(rank_y, 0).
-export_type([
progress/0,
agent_stream_progress/0
]).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
%%--------------------------------------------------------------------
%% new
-spec new(opts()) -> t().
new(Opts) ->
#{
agent => emqx_persistent_session_ds_shared_subs_agent:new(
agent_opts(Opts)
)
),
scheduled_actions => #{}
}.
%%--------------------------------------------------------------------
%% open
-spec open(emqx_persistent_session_ds_state:t(), opts()) ->
{ok, emqx_persistent_session_ds_state:t(), t()}.
open(S, Opts) ->
open(S0, Opts) ->
SharedSubscriptions = fold_shared_subs(
fun(#share{} = TopicFilter, Sub, Acc) ->
[{TopicFilter, to_agent_subscription(S, Sub)} | Acc]
fun(#share{} = ShareTopicFilter, Sub, Acc) ->
[{ShareTopicFilter, to_agent_subscription(S0, Sub)} | Acc]
end,
[],
S
S0
),
Agent = emqx_persistent_session_ds_shared_subs_agent:open(
SharedSubscriptions, agent_opts(Opts)
),
SharedSubS = #{agent => Agent},
{ok, S, SharedSubS}.
SharedSubS = #{agent => Agent, scheduled_actions => #{}},
S1 = revoke_all_streams(S0),
{ok, S1, SharedSubS}.
%%--------------------------------------------------------------------
%% on_subscribe
-spec on_subscribe(
share_topic_filter(),
emqx_types:subopts(),
emqx_persistent_session_ds:session()
) -> {ok, emqx_persistent_session_ds_state:t(), t()} | {error, emqx_types:reason_code()}.
on_subscribe(TopicFilter, SubOpts, #{s := S} = Session) ->
Subscription = emqx_persistent_session_ds_state:get_subscription(TopicFilter, S),
on_subscribe(Subscription, TopicFilter, SubOpts, Session).
-spec on_unsubscribe(
emqx_persistent_session_ds:id(),
emqx_persistent_session_ds:topic_filter(),
emqx_persistent_session_ds_state:t(),
t()
) ->
{ok, emqx_persistent_session_ds_state:t(), t(), emqx_persistent_session_ds:subscription()}
| {error, emqx_types:reason_code()}.
on_unsubscribe(SessionId, TopicFilter, S0, #{agent := Agent0} = SharedSubS0) ->
case lookup(TopicFilter, S0) of
undefined ->
{error, ?RC_NO_SUBSCRIPTION_EXISTED};
Subscription ->
?tp(persistent_session_ds_subscription_delete, #{
session_id => SessionId, topic_filter => TopicFilter
}),
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_unsubscribe(
Agent0, TopicFilter
),
SharedSubS = SharedSubS0#{agent => Agent1},
S = emqx_persistent_session_ds_state:del_subscription(TopicFilter, S0),
{ok, S, SharedSubS, Subscription}
end.
-spec renew_streams(emqx_persistent_session_ds_state:t(), t()) ->
{emqx_persistent_session_ds_state:t(), t()}.
renew_streams(S0, #{agent := Agent0} = SharedSubS0) ->
{StreamLeaseEvents, Agent1} = emqx_persistent_session_ds_shared_subs_agent:renew_streams(
Agent0
),
?tp(info, shared_subs_new_stream_lease_events, #{stream_lease_events => StreamLeaseEvents}),
S1 = lists:foldl(
fun
(#{type := lease} = Event, S) -> accept_stream(Event, S);
(#{type := revoke} = Event, S) -> revoke_stream(Event, S)
end,
S0,
StreamLeaseEvents
),
SharedSubS1 = SharedSubS0#{agent => Agent1},
{S1, SharedSubS1}.
-spec on_streams_replayed(
emqx_persistent_session_ds_state:t(),
t()
) -> {emqx_persistent_session_ds_state:t(), t()}.
on_streams_replayed(S, #{agent := Agent0} = SharedSubS0) ->
%% TODO
%% Is it sufficient for a report?
Progress = fold_shared_stream_states(
fun(TopicFilter, Stream, SRS, Acc) ->
#srs{it_begin = BeginIt} = SRS,
StreamProgress = #{
topic_filter => TopicFilter,
stream => Stream,
iterator => BeginIt
},
[StreamProgress | Acc]
end,
[],
S
),
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_stream_progress(
Agent0, Progress
),
SharedSubS1 = SharedSubS0#{agent => Agent1},
{S, SharedSubS1}.
-spec on_info(emqx_persistent_session_ds_state:t(), t(), term()) ->
{emqx_persistent_session_ds_state:t(), t()}.
on_info(S, #{agent := Agent0} = SharedSubS0, Info) ->
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_info(Agent0, Info),
SharedSubS1 = SharedSubS0#{agent => Agent1},
{S, SharedSubS1}.
-spec to_map(emqx_persistent_session_ds_state:t(), t()) -> map().
to_map(_S, _SharedSubS) ->
%% TODO
#{}.
on_subscribe(#share{} = ShareTopicFilter, SubOpts, #{s := S} = Session) ->
Subscription = emqx_persistent_session_ds_state:get_subscription(ShareTopicFilter, S),
on_subscribe(Subscription, ShareTopicFilter, SubOpts, Session).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
%% on_subscribe internal functions
fold_shared_subs(Fun, Acc, S) ->
emqx_persistent_session_ds_state:fold_subscriptions(
fun
(#share{} = TopicFilter, Sub, Acc0) -> Fun(TopicFilter, Sub, Acc0);
(_, _Sub, Acc0) -> Acc0
end,
Acc,
S
).
fold_shared_stream_states(Fun, Acc, S) ->
%% TODO
%% Optimize or cache
TopicFilters = fold_shared_subs(
fun
(#share{} = TopicFilter, #{id := Id} = _Sub, Acc0) ->
Acc0#{Id => TopicFilter};
(_, _, Acc0) ->
Acc0
end,
#{},
S
),
emqx_persistent_session_ds_state:fold_streams(
fun({SubId, Stream}, SRS, Acc0) ->
case TopicFilters of
#{SubId := TopicFilter} ->
Fun(TopicFilter, Stream, SRS, Acc0);
_ ->
Acc0
end
end,
Acc,
S
).
on_subscribe(undefined, TopicFilter, SubOpts, #{props := Props, s := S} = Session) ->
on_subscribe(undefined, ShareTopicFilter, SubOpts, #{props := Props, s := S} = Session) ->
#{max_subscriptions := MaxSubscriptions} = Props,
case emqx_persistent_session_ds_state:n_subscriptions(S) < MaxSubscriptions of
true ->
create_new_subscription(TopicFilter, SubOpts, Session);
create_new_subscription(ShareTopicFilter, SubOpts, Session);
false ->
{error, ?RC_QUOTA_EXCEEDED}
end;
on_subscribe(Subscription, TopicFilter, SubOpts, Session) ->
update_subscription(Subscription, TopicFilter, SubOpts, Session).
on_subscribe(Subscription, ShareTopicFilter, SubOpts, Session) ->
update_subscription(Subscription, ShareTopicFilter, SubOpts, Session).
-dialyzer({nowarn_function, create_new_subscription/3}).
create_new_subscription(TopicFilter, SubOpts, #{
id := SessionId, s := S0, shared_sub_s := #{agent := Agent0} = SharedSubS0, props := Props
create_new_subscription(#share{topic = TopicFilter, group = Group} = ShareTopicFilter, SubOpts, #{
id := SessionId,
s := S0,
shared_sub_s := #{agent := Agent} = SharedSubS0,
props := Props
}) ->
case
emqx_persistent_session_ds_shared_subs_agent:on_subscribe(
Agent0, TopicFilter, SubOpts
emqx_persistent_session_ds_shared_subs_agent:can_subscribe(
Agent, ShareTopicFilter, SubOpts
)
of
{ok, Agent1} ->
ok ->
ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, #share_dest{
session_id = SessionId, group = Group
}),
_ = emqx_external_broker:add_persistent_shared_route(TopicFilter, Group, SessionId),
#{upgrade_qos := UpgradeQoS} = Props,
{SubId, S1} = emqx_persistent_session_ds_state:new_id(S0),
{SStateId, S2} = emqx_persistent_session_ds_state:new_id(S1),
@ -227,20 +199,20 @@ create_new_subscription(TopicFilter, SubOpts, #{
start_time => now_ms()
},
S = emqx_persistent_session_ds_state:put_subscription(
TopicFilter, Subscription, S3
ShareTopicFilter, Subscription, S3
),
SharedSubS = SharedSubS0#{agent => Agent1},
?tp(persistent_session_ds_shared_subscription_added, #{
topic_filter => TopicFilter, session => SessionId
}),
SharedSubS = schedule_subscribe(SharedSubS0, ShareTopicFilter, SubOpts),
{ok, S, SharedSubS};
{error, _} = Error ->
Error
end.
update_subscription(#{current_state := SStateId0, id := SubId} = Sub0, TopicFilter, SubOpts, #{
s := S0, shared_sub_s := SharedSubS, props := Props
}) ->
update_subscription(
#{current_state := SStateId0, id := SubId} = Sub0, ShareTopicFilter, SubOpts, #{
s := S0, shared_sub_s := SharedSubS, props := Props
}
) ->
#{upgrade_qos := UpgradeQoS} = Props,
SState = #{parent_subscription => SubId, upgrade_qos => UpgradeQoS, subopts => SubOpts},
case emqx_persistent_session_ds_state:get_subscription_state(SStateId0, S0) of
@ -254,36 +226,173 @@ update_subscription(#{current_state := SStateId0, id := SubId} = Sub0, TopicFilt
SStateId, SState, S1
),
Sub = Sub0#{current_state => SStateId},
S = emqx_persistent_session_ds_state:put_subscription(TopicFilter, Sub, S2),
S = emqx_persistent_session_ds_state:put_subscription(ShareTopicFilter, Sub, S2),
{ok, S, SharedSubS}
end.
lookup(TopicFilter, S) ->
case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S) of
Sub = #{current_state := SStateId} ->
case emqx_persistent_session_ds_state:get_subscription_state(SStateId, S) of
#{subopts := SubOpts} ->
Sub#{subopts => SubOpts};
undefined ->
undefined
end;
-dialyzer({nowarn_function, schedule_subscribe/3}).
schedule_subscribe(
#{agent := Agent0, scheduled_actions := ScheduledActions0} = SharedSubS0,
ShareTopicFilter,
SubOpts
) ->
case ScheduledActions0 of
#{ShareTopicFilter := ScheduledAction} ->
ScheduledActions1 = ScheduledActions0#{
ShareTopicFilter => ScheduledAction#{type => {?schedule_subscribe, SubOpts}}
},
?tp(debug, shared_subs_schedule_subscribe_override, #{
share_topic_filter => ShareTopicFilter,
new_type => {?schedule_subscribe, SubOpts},
old_action => format_schedule_action(ScheduledAction)
}),
SharedSubS0#{scheduled_actions := ScheduledActions1};
_ ->
?tp(debug, shared_subs_schedule_subscribe_new, #{
share_topic_filter => ShareTopicFilter, subopts => SubOpts
}),
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_subscribe(
Agent0, ShareTopicFilter, SubOpts
),
SharedSubS0#{agent => Agent1}
end.
%%--------------------------------------------------------------------
%% on_unsubscribe
-spec on_unsubscribe(
emqx_persistent_session_ds:id(),
share_topic_filter(),
emqx_persistent_session_ds_state:t(),
t()
) ->
{ok, emqx_persistent_session_ds_state:t(), t(), emqx_persistent_session_ds:subscription()}
| {error, emqx_types:reason_code()}.
on_unsubscribe(
SessionId, #share{topic = TopicFilter, group = Group} = ShareTopicFilter, S0, SharedSubS0
) ->
case lookup(ShareTopicFilter, S0) of
undefined ->
undefined
{error, ?RC_NO_SUBSCRIPTION_EXISTED};
#{id := SubId} = Subscription ->
?tp(persistent_session_ds_subscription_delete, #{
session_id => SessionId, share_topic_filter => ShareTopicFilter
}),
_ = emqx_external_broker:delete_persistent_shared_route(TopicFilter, Group, SessionId),
ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, #share_dest{
session_id = SessionId, group = Group
}),
S = emqx_persistent_session_ds_state:del_subscription(ShareTopicFilter, S0),
SharedSubS = schedule_unsubscribe(S, SharedSubS0, SubId, ShareTopicFilter),
{ok, S, SharedSubS, Subscription}
end.
%%--------------------------------------------------------------------
%% on_unsubscribe internal functions
schedule_unsubscribe(
S, #{scheduled_actions := ScheduledActions0} = SharedSubS0, UnsubscridedSubId, ShareTopicFilter
) ->
case ScheduledActions0 of
#{ShareTopicFilter := ScheduledAction0} ->
ScheduledAction1 = ScheduledAction0#{type => ?schedule_unsubscribe},
ScheduledActions1 = ScheduledActions0#{
ShareTopicFilter => ScheduledAction1
},
?tp(debug, shared_subs_schedule_unsubscribe_override, #{
share_topic_filter => ShareTopicFilter,
new_type => ?schedule_unsubscribe,
old_action => format_schedule_action(ScheduledAction0)
}),
SharedSubS0#{scheduled_actions := ScheduledActions1};
_ ->
StreamKeys = stream_keys_by_sub_id(S, UnsubscridedSubId),
ScheduledActions1 = ScheduledActions0#{
ShareTopicFilter => #{
type => ?schedule_unsubscribe,
stream_keys_to_wait => StreamKeys,
progresses => []
}
},
?tp(debug, shared_subs_schedule_unsubscribe_new, #{
share_topic_filter => ShareTopicFilter,
stream_keys => format_stream_keys(StreamKeys)
}),
SharedSubS0#{scheduled_actions := ScheduledActions1}
end.
%%--------------------------------------------------------------------
%% pre_renew_streams
-spec pre_renew_streams(emqx_persistent_session_ds_state:t(), t()) ->
{emqx_persistent_session_ds_state:t(), t()}.
pre_renew_streams(S, SharedSubS) ->
on_streams_replay(S, SharedSubS).
%%--------------------------------------------------------------------
%% renew_streams
-spec renew_streams(emqx_persistent_session_ds_state:t(), t()) ->
{emqx_persistent_session_ds_state:t(), t()}.
renew_streams(S0, #{agent := Agent0, scheduled_actions := ScheduledActions} = SharedSubS0) ->
{StreamLeaseEvents, Agent1} = emqx_persistent_session_ds_shared_subs_agent:renew_streams(
Agent0
),
StreamLeaseEvents =/= [] andalso
?tp(debug, shared_subs_new_stream_lease_events, #{
stream_lease_events => format_lease_events(StreamLeaseEvents)
}),
S1 = lists:foldl(
fun
(#{type := lease} = Event, S) -> accept_stream(Event, S, ScheduledActions);
(#{type := revoke} = Event, S) -> revoke_stream(Event, S)
end,
S0,
StreamLeaseEvents
),
SharedSubS1 = SharedSubS0#{agent => Agent1},
{S1, SharedSubS1}.
%%--------------------------------------------------------------------
%% renew_streams internal functions
accept_stream(#{share_topic_filter := ShareTopicFilter} = Event, S, ScheduledActions) ->
%% If we have a pending action (subscribe or unsubscribe) for this topic filter,
%% we should not accept a stream and start replaying it. We won't use it anyway:
%% * if subscribe is pending, we will reset agent obtain a new lease
%% * if unsubscribe is pending, we will drop connection
case ScheduledActions of
#{ShareTopicFilter := _Action} ->
S;
_ ->
accept_stream(Event, S)
end.
accept_stream(
#{topic_filter := TopicFilter, stream := Stream, iterator := Iterator}, S0
#{
share_topic_filter := ShareTopicFilter,
stream := Stream,
progress := #{iterator := Iterator} = _Progress
} = _Event,
S0
) ->
case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of
case emqx_persistent_session_ds_state:get_subscription(ShareTopicFilter, S0) of
undefined ->
%% This should not happen.
%% Agent should have received unsubscribe callback
%% and should not have passed this stream as a new one
error(new_stream_without_sub);
%% We unsubscribed
S0;
#{id := SubId, current_state := SStateId} ->
Key = {SubId, Stream},
case emqx_persistent_session_ds_state:get_stream(Key, S0) of
undefined ->
NeedCreateStream =
case emqx_persistent_session_ds_state:get_stream(Key, S0) of
undefined ->
true;
#srs{unsubscribed = true} ->
true;
_SRS ->
false
end,
case NeedCreateStream of
true ->
NewSRS =
#srs{
rank_x = ?rank_x,
@ -294,15 +403,15 @@ accept_stream(
},
S1 = emqx_persistent_session_ds_state:put_stream(Key, NewSRS, S0),
S1;
_SRS ->
false ->
S0
end
end.
revoke_stream(
#{topic_filter := TopicFilter, stream := Stream}, S0
#{share_topic_filter := ShareTopicFilter, stream := Stream}, S0
) ->
case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of
case emqx_persistent_session_ds_state:get_subscription(ShareTopicFilter, S0) of
undefined ->
%% This should not happen.
%% Agent should have received unsubscribe callback
@ -320,19 +429,363 @@ revoke_stream(
end
end.
-spec to_agent_subscription(
emqx_persistent_session_ds_state:t(), emqx_persistent_session_ds:subscription()
%%--------------------------------------------------------------------
%% on_streams_replay
-spec on_streams_replay(
emqx_persistent_session_ds_state:t(),
t()
) -> {emqx_persistent_session_ds_state:t(), t()}.
on_streams_replay(S0, SharedSubS0) ->
{S1, #{agent := Agent0, scheduled_actions := ScheduledActions0} = SharedSubS1} =
renew_streams(S0, SharedSubS0),
Progresses = all_stream_progresses(S1, Agent0),
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_stream_progress(
Agent0, Progresses
),
{Agent2, ScheduledActions1} = run_scheduled_actions(S1, Agent1, ScheduledActions0),
SharedSubS2 = SharedSubS1#{
agent => Agent2,
scheduled_actions => ScheduledActions1
},
{S1, SharedSubS2}.
%%--------------------------------------------------------------------
%% on_streams_replay internal functions
all_stream_progresses(S, Agent) ->
all_stream_progresses(S, Agent, _NeedUnacked = false).
all_stream_progresses(S, _Agent, NeedUnacked) ->
CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S),
CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S),
fold_shared_stream_states(
fun(ShareTopicFilter, Stream, SRS, ProgressesAcc0) ->
case
is_stream_started(CommQos1, CommQos2, SRS) and
(NeedUnacked or is_stream_fully_acked(CommQos1, CommQos2, SRS))
of
true ->
StreamProgress = stream_progress(CommQos1, CommQos2, Stream, SRS),
maps:update_with(
ShareTopicFilter,
fun(Progresses) -> [StreamProgress | Progresses] end,
[StreamProgress],
ProgressesAcc0
);
false ->
ProgressesAcc0
end
end,
#{},
S
).
run_scheduled_actions(S, Agent, ScheduledActions) ->
maps:fold(
fun(ShareTopicFilter, Action0, {AgentAcc0, ScheduledActionsAcc}) ->
case run_scheduled_action(S, AgentAcc0, ShareTopicFilter, Action0) of
{ok, AgentAcc1} ->
{AgentAcc1, maps:remove(ShareTopicFilter, ScheduledActionsAcc)};
{continue, Action1} ->
{AgentAcc0, ScheduledActionsAcc#{ShareTopicFilter => Action1}}
end
end,
{Agent, ScheduledActions},
ScheduledActions
).
run_scheduled_action(
S,
Agent0,
ShareTopicFilter,
#{type := Type, stream_keys_to_wait := StreamKeysToWait0, progresses := Progresses0} = Action
) ->
emqx_persistent_session_ds_shared_subs_agent:subscription().
to_agent_subscription(_S, Subscription) ->
StreamKeysToWait1 = filter_unfinished_streams(S, StreamKeysToWait0),
Progresses1 = stream_progresses(S, StreamKeysToWait0 -- StreamKeysToWait1) ++ Progresses0,
case StreamKeysToWait1 of
[] ->
?tp(debug, shared_subs_schedule_action_complete, #{
share_topic_filter => ShareTopicFilter,
progresses => format_stream_progresses(Progresses1),
type => Type
}),
%% Regular progress won't se unsubscribed streams, so we need to
%% send the progress explicitly.
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_stream_progress(
Agent0, #{ShareTopicFilter => Progresses1}
),
case Type of
{?schedule_subscribe, SubOpts} ->
{ok,
emqx_persistent_session_ds_shared_subs_agent:on_subscribe(
Agent1, ShareTopicFilter, SubOpts
)};
?schedule_unsubscribe ->
{ok,
emqx_persistent_session_ds_shared_subs_agent:on_unsubscribe(
Agent1, ShareTopicFilter, Progresses1
)}
end;
_ ->
Action1 = Action#{stream_keys_to_wait => StreamKeysToWait1, progresses => Progresses1},
?tp(debug, shared_subs_schedule_action_continue, #{
share_topic_filter => ShareTopicFilter,
new_action => format_schedule_action(Action1)
}),
{continue, Action1}
end.
filter_unfinished_streams(S, StreamKeysToWait) ->
CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S),
CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S),
lists:filter(
fun(Key) ->
case emqx_persistent_session_ds_state:get_stream(Key, S) of
undefined ->
%% This should not happen: we should see any stream
%% in completed state before deletion
true;
SRS ->
not is_stream_fully_acked(CommQos1, CommQos2, SRS)
end
end,
StreamKeysToWait
).
stream_progresses(S, StreamKeys) ->
CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S),
CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S),
lists:map(
fun({_SubId, Stream} = Key) ->
SRS = emqx_persistent_session_ds_state:get_stream(Key, S),
stream_progress(CommQos1, CommQos2, Stream, SRS)
end,
StreamKeys
).
%%--------------------------------------------------------------------
%% on_disconnect
on_disconnect(S0, #{agent := Agent0} = SharedSubS0) ->
S1 = revoke_all_streams(S0),
Progresses = all_stream_progresses(S1, Agent0, _NeedUnacked = true),
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_disconnect(Agent0, Progresses),
SharedSubS1 = SharedSubS0#{agent => Agent1, scheduled_actions => #{}},
{S1, SharedSubS1}.
%%--------------------------------------------------------------------
%% on_disconnect helpers
revoke_all_streams(S0) ->
fold_shared_stream_states(
fun(ShareTopicFilter, Stream, _SRS, S) ->
revoke_stream(#{share_topic_filter => ShareTopicFilter, stream => Stream}, S)
end,
S0,
S0
).
%%--------------------------------------------------------------------
%% on_info
-spec on_info(emqx_persistent_session_ds_state:t(), t(), term()) ->
{emqx_persistent_session_ds_state:t(), t()}.
on_info(S, #{agent := Agent0} = SharedSubS0, Info) ->
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_info(Agent0, Info),
SharedSubS1 = SharedSubS0#{agent => Agent1},
{S, SharedSubS1}.
%%--------------------------------------------------------------------
%% to_map
-spec to_map(emqx_persistent_session_ds_state:t(), t()) -> map().
to_map(S, _SharedSubS) ->
fold_shared_subs(
fun(ShareTopicFilter, _, Acc) -> Acc#{ShareTopicFilter => lookup(ShareTopicFilter, S)} end,
#{},
S
).
%%--------------------------------------------------------------------
%% cold_get_subscription
-spec cold_get_subscription(emqx_persistent_session_ds:id(), share_topic_filter()) ->
emqx_persistent_session_ds:subscription() | undefined.
cold_get_subscription(SessionId, ShareTopicFilter) ->
case emqx_persistent_session_ds_state:cold_get_subscription(SessionId, ShareTopicFilter) of
[Sub = #{current_state := SStateId}] ->
case
emqx_persistent_session_ds_state:cold_get_subscription_state(SessionId, SStateId)
of
[#{subopts := Subopts}] ->
Sub#{subopts => Subopts};
_ ->
undefined
end;
_ ->
undefined
end.
%%--------------------------------------------------------------------
%% Generic helpers
%%--------------------------------------------------------------------
lookup(ShareTopicFilter, S) ->
case emqx_persistent_session_ds_state:get_subscription(ShareTopicFilter, S) of
Sub = #{current_state := SStateId} ->
case emqx_persistent_session_ds_state:get_subscription_state(SStateId, S) of
#{subopts := SubOpts} ->
Sub#{subopts => SubOpts};
undefined ->
undefined
end;
undefined ->
undefined
end.
stream_keys_by_sub_id(S, MatchSubId) ->
emqx_persistent_session_ds_state:fold_streams(
fun({SubId, _Stream} = StreamKey, _SRS, StreamKeys) ->
case SubId of
MatchSubId ->
[StreamKey | StreamKeys];
_ ->
StreamKeys
end
end,
[],
S
).
stream_progress(
CommQos1,
CommQos2,
Stream,
#srs{
it_end = EndIt,
it_begin = BeginIt
} = SRS
) ->
Iterator =
case is_stream_fully_acked(CommQos1, CommQos2, SRS) of
true -> EndIt;
false -> BeginIt
end,
#{
stream => Stream,
progress => #{
iterator => Iterator
},
use_finished => is_use_finished(SRS)
}.
fold_shared_subs(Fun, Acc, S) ->
emqx_persistent_session_ds_state:fold_subscriptions(
fun
(#share{} = ShareTopicFilter, Sub, Acc0) -> Fun(ShareTopicFilter, Sub, Acc0);
(_, _Sub, Acc0) -> Acc0
end,
Acc,
S
).
fold_shared_stream_states(Fun, Acc, S) ->
%% TODO
%% do we need anything from sub state?
%% Optimize or cache
ShareTopicFilters = fold_shared_subs(
fun
(#share{} = ShareTopicFilter, #{id := Id} = _Sub, Acc0) ->
Acc0#{Id => ShareTopicFilter};
(_, _, Acc0) ->
Acc0
end,
#{},
S
),
emqx_persistent_session_ds_state:fold_streams(
fun({SubId, Stream}, SRS, Acc0) ->
case ShareTopicFilters of
#{SubId := ShareTopicFilter} ->
Fun(ShareTopicFilter, Stream, SRS, Acc0);
_ ->
Acc0
end
end,
Acc,
S
).
to_agent_subscription(_S, Subscription) ->
maps:with([start_time], Subscription).
-spec agent_opts(opts()) -> emqx_persistent_session_ds_shared_subs_agent:opts().
agent_opts(#{session_id := SessionId}) ->
#{session_id => SessionId}.
-dialyzer({nowarn_function, now_ms/0}).
now_ms() ->
erlang:system_time(millisecond).
is_use_finished(#srs{unsubscribed = Unsubscribed}) ->
Unsubscribed.
is_stream_started(CommQos1, CommQos2, #srs{first_seqno_qos1 = Q1, last_seqno_qos1 = Q2}) ->
(CommQos1 >= Q1) or (CommQos2 >= Q2).
is_stream_fully_acked(_, _, #srs{
first_seqno_qos1 = Q1, last_seqno_qos1 = Q1, first_seqno_qos2 = Q2, last_seqno_qos2 = Q2
}) ->
%% Streams where the last chunk doesn't contain any QoS1 and 2
%% messages are considered fully acked:
true;
is_stream_fully_acked(Comm1, Comm2, #srs{last_seqno_qos1 = S1, last_seqno_qos2 = S2}) ->
(Comm1 >= S1) andalso (Comm2 >= S2).
%%--------------------------------------------------------------------
%% Formatters
%%--------------------------------------------------------------------
format_schedule_action(#{
type := Type, progresses := Progresses, stream_keys_to_wait := StreamKeysToWait
}) ->
#{
type => Type,
progresses => format_stream_progresses(Progresses),
stream_keys_to_wait => format_stream_keys(StreamKeysToWait)
}.
format_stream_progresses(Streams) ->
lists:map(
fun format_stream_progress/1,
Streams
).
format_stream_progress(#{stream := Stream, progress := Progress} = Value) ->
Value#{stream => format_opaque(Stream), progress => format_progress(Progress)}.
format_progress(#{iterator := Iterator} = Progress) ->
Progress#{iterator => format_opaque(Iterator)}.
format_stream_key(beginning) -> beginning;
format_stream_key({SubId, Stream}) -> {SubId, format_opaque(Stream)}.
format_stream_keys(StreamKeys) ->
lists:map(
fun format_stream_key/1,
StreamKeys
).
format_lease_events(Events) ->
lists:map(
fun format_lease_event/1,
Events
).
format_lease_event(#{stream := Stream, progress := Progress} = Event) ->
Event#{stream => format_opaque(Stream), progress => format_progress(Progress)};
format_lease_event(#{stream := Stream} = Event) ->
Event#{stream => format_opaque(Stream)}.
format_opaque(Opaque) ->
erlang:phash2(Opaque).

View File

@ -15,7 +15,7 @@
}.
-type t() :: term().
-type topic_filter() :: emqx_persistent_session_ds:share_topic_filter().
-type share_topic_filter() :: emqx_persistent_session_ds:share_topic_filter().
-type opts() :: #{
session_id := session_id()
@ -28,41 +28,44 @@
-type stream_lease() :: #{
type => lease,
%% Used as "external" subscription_id
topic_filter := topic_filter(),
share_topic_filter := share_topic_filter(),
stream := emqx_ds:stream(),
iterator := emqx_ds:iterator()
}.
-type stream_revoke() :: #{
type => revoke,
topic_filter := topic_filter(),
share_topic_filter := share_topic_filter(),
stream := emqx_ds:stream()
}.
-type stream_lease_event() :: stream_lease() | stream_revoke().
-type stream_progress() :: #{
topic_filter := topic_filter(),
share_topic_filter := share_topic_filter(),
stream := emqx_ds:stream(),
iterator := emqx_ds:iterator()
iterator := emqx_ds:iterator(),
use_finished := boolean()
}.
-export_type([
t/0,
subscription/0,
session_id/0,
stream_lease/0,
stream_lease_event/0,
opts/0
]).
-export([
new/1,
open/2,
can_subscribe/3,
on_subscribe/3,
on_unsubscribe/2,
on_unsubscribe/3,
on_stream_progress/2,
on_info/2,
on_disconnect/2,
renew_streams/1
]).
@ -77,12 +80,13 @@
%%--------------------------------------------------------------------
-callback new(opts()) -> t().
-callback open([{topic_filter(), subscription()}], opts()) -> t().
-callback on_subscribe(t(), topic_filter(), emqx_types:subopts()) ->
{ok, t()} | {error, term()}.
-callback on_unsubscribe(t(), topic_filter()) -> t().
-callback open([{share_topic_filter(), subscription()}], opts()) -> t().
-callback can_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> ok | {error, term()}.
-callback on_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> t().
-callback on_unsubscribe(t(), share_topic_filter(), [stream_progress()]) -> t().
-callback on_disconnect(t(), [stream_progress()]) -> t().
-callback renew_streams(t()) -> {[stream_lease_event()], t()}.
-callback on_stream_progress(t(), [stream_progress()]) -> t().
-callback on_stream_progress(t(), #{share_topic_filter() => [stream_progress()]}) -> t().
-callback on_info(t(), term()) -> t().
%%--------------------------------------------------------------------
@ -93,24 +97,31 @@
new(Opts) ->
?shared_subs_agent:new(Opts).
-spec open([{topic_filter(), subscription()}], opts()) -> t().
-spec open([{share_topic_filter(), subscription()}], opts()) -> t().
open(Topics, Opts) ->
?shared_subs_agent:open(Topics, Opts).
-spec on_subscribe(t(), topic_filter(), emqx_types:subopts()) ->
{ok, t()} | {error, emqx_types:reason_code()}.
on_subscribe(Agent, TopicFilter, SubOpts) ->
?shared_subs_agent:on_subscribe(Agent, TopicFilter, SubOpts).
-spec can_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> ok | {error, term()}.
can_subscribe(Agent, ShareTopicFilter, SubOpts) ->
?shared_subs_agent:can_subscribe(Agent, ShareTopicFilter, SubOpts).
-spec on_unsubscribe(t(), topic_filter()) -> t().
on_unsubscribe(Agent, TopicFilter) ->
?shared_subs_agent:on_unsubscribe(Agent, TopicFilter).
-spec on_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> t().
on_subscribe(Agent, ShareTopicFilter, SubOpts) ->
?shared_subs_agent:on_subscribe(Agent, ShareTopicFilter, SubOpts).
-spec on_unsubscribe(t(), share_topic_filter(), [stream_progress()]) -> t().
on_unsubscribe(Agent, ShareTopicFilter, StreamProgresses) ->
?shared_subs_agent:on_unsubscribe(Agent, ShareTopicFilter, StreamProgresses).
-spec on_disconnect(t(), #{share_topic_filter() => [stream_progress()]}) -> t().
on_disconnect(Agent, StreamProgresses) ->
?shared_subs_agent:on_disconnect(Agent, StreamProgresses).
-spec renew_streams(t()) -> {[stream_lease_event()], t()}.
renew_streams(Agent) ->
?shared_subs_agent:renew_streams(Agent).
-spec on_stream_progress(t(), [stream_progress()]) -> t().
-spec on_stream_progress(t(), #{share_topic_filter() => [stream_progress()]}) -> t().
on_stream_progress(Agent, StreamProgress) ->
?shared_subs_agent:on_stream_progress(Agent, StreamProgress).

View File

@ -9,11 +9,13 @@
-export([
new/1,
open/2,
can_subscribe/3,
on_subscribe/3,
on_unsubscribe/2,
on_unsubscribe/3,
on_stream_progress/2,
on_info/2,
on_disconnect/2,
renew_streams/1
]).
@ -30,10 +32,16 @@ new(_Opts) ->
open(_Topics, _Opts) ->
undefined.
on_subscribe(_Agent, _TopicFilter, _SubOpts) ->
can_subscribe(_Agent, _TopicFilter, _SubOpts) ->
{error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}.
on_unsubscribe(Agent, _TopicFilter) ->
on_subscribe(Agent, _TopicFilter, _SubOpts) ->
Agent.
on_unsubscribe(Agent, _TopicFilter, _Progresses) ->
Agent.
on_disconnect(Agent, _) ->
Agent.
renew_streams(Agent) ->

View File

@ -399,7 +399,9 @@ new_id(Rec) ->
get_subscription(TopicFilter, Rec) ->
gen_get(?subscriptions, TopicFilter, Rec).
-spec cold_get_subscription(emqx_persistent_session_ds:id(), emqx_types:topic()) ->
-spec cold_get_subscription(
emqx_persistent_session_ds:id(), emqx_types:topic() | emqx_types:share()
) ->
[emqx_persistent_session_ds_subs:subscription()].
cold_get_subscription(SessionId, Topic) ->
kv_pmap_read(?subscription_tab, SessionId, Topic).

View File

@ -21,7 +21,7 @@
-record(ps_route, {
topic :: binary(),
dest :: emqx_persistent_session_ds:id() | '_'
dest :: emqx_persistent_session_ds_router:dest() | '_'
}).
-record(ps_routeidx, {

View File

@ -21,6 +21,7 @@
%% Till full implementation we need to dispach to the null agent.
%% It will report "not implemented" error for attempts to use shared subscriptions.
-define(shared_subs_agent, emqx_persistent_session_ds_shared_subs_null_agent).
% -define(shared_subs_agent, emqx_ds_shared_sub_agent).
%% end of -ifdef(TEST).
-endif.

View File

@ -0,0 +1,40 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2017-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_post_upgrade).
%% Example of a hot upgrade callback function.
%% PR#12765
% -export([
% pr12765_update_stats_timer/1,
% pr20000_ensure_sup_started/3
% ]).
%% Please ensure that every callback function is reentrant.
%% This way, users can attempt upgrade multiple times if an issue arises.
%%
% pr12765_update_stats_timer(_FromVsn) ->
% emqx_stats:update_interval(broker_stats, fun emqx_broker_helper:stats_fun/0).
%
% pr20000_ensure_sup_started(_FromVsn, "5.6.1" ++ _, ChildSpec) ->
% ChildId = maps:get(id, ChildSpec),
% case supervisor:terminate_child(emqx_sup, ChildId) of
% ok -> supervisor:delete_child(emqx_sup, ChildId);
% Error -> Error
% end,
% supervisor:start_child(emqx_sup, ChildSpec);
% pr20000_ensure_sup_started(_FromVsn, _TargetVsn, _) ->
% ok.

View File

@ -62,7 +62,7 @@
streams := [{pid(), quicer:stream_handle()}],
%% New stream opts
stream_opts := map(),
%% If conneciton is resumed from session ticket
%% If connection is resumed from session ticket
is_resumed => boolean(),
%% mqtt message serializer config
serialize => undefined,
@ -70,8 +70,8 @@
}.
-type cb_ret() :: quicer_lib:cb_ret().
%% @doc Data streams initializions are started in parallel with control streams, data streams are blocked
%% for the activation from control stream after it is accepted as a legit conneciton.
%% @doc Data streams initializations are started in parallel with control streams, data streams are blocked
%% for the activation from control stream after it is accepted as a legit connection.
%% For security, the initial number of allowed data streams from client should be limited by
%% 'peer_bidi_stream_count` & 'peer_unidi_stream_count`
-spec activate_data_streams(pid(), {
@ -80,7 +80,7 @@
activate_data_streams(ConnOwner, {PS, Serialize, Channel}) ->
gen_server:call(ConnOwner, {activate_data_streams, {PS, Serialize, Channel}}, infinity).
%% @doc conneciton owner init callback
%% @doc connection owner init callback
-spec init(map()) -> {ok, cb_state()}.
init(#{stream_opts := SOpts} = S) when is_list(SOpts) ->
init(S#{stream_opts := maps:from_list(SOpts)});

View File

@ -1,42 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2017-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_relup).
%% NOTE: DO NOT remove this `-include`.
%% We use this to force this module to be upgraded every release.
-include("emqx_release.hrl").
-export([
post_release_upgrade/2,
post_release_downgrade/2
]).
-define(INFO(FORMAT), io:format("[emqx_relup] " ++ FORMAT ++ "~n")).
-define(INFO(FORMAT, ARGS), io:format("[emqx_relup] " ++ FORMAT ++ "~n", ARGS)).
%% What to do after upgraded from an old release vsn.
post_release_upgrade(FromRelVsn, _) ->
?INFO("emqx has been upgraded from ~s to ~s!", [FromRelVsn, emqx_release:version()]),
reload_components().
%% What to do after downgraded to an old release vsn.
post_release_downgrade(ToRelVsn, _) ->
?INFO("emqx has been downgraded from ~s to ~s!", [emqx_release:version(), ToRelVsn]),
reload_components().
reload_components() ->
ok.

View File

@ -137,7 +137,7 @@ maybe_badrpc(Delivery) ->
Delivery.
max_client_num() ->
emqx:get_config([rpc, tcp_client_num], ?DefaultClientNum).
emqx:get_config([rpc, client_num], ?DefaultClientNum).
-spec unwrap_erpc(emqx_rpc:erpc(A) | [emqx_rpc:erpc(A)]) -> A | {error, _Err} | list().
unwrap_erpc(Res) when is_list(Res) ->

View File

@ -63,6 +63,7 @@
-type json_binary() :: binary().
-type template() :: binary().
-type template_str() :: string().
-type binary_kv() :: #{binary() => binary()}.
-typerefl_from_string({duration/0, emqx_schema, to_duration}).
-typerefl_from_string({duration_s/0, emqx_schema, to_duration_s}).
@ -167,7 +168,8 @@
json_binary/0,
port_number/0,
template/0,
template_str/0
template_str/0,
binary_kv/0
]).
-export([namespace/0, roots/0, roots/1, fields/1, desc/1, tags/0]).
@ -319,6 +321,11 @@ roots(low) ->
sc(
ref("crl_cache"),
#{importance => ?IMPORTANCE_HIDDEN}
)},
{banned,
sc(
ref("banned"),
#{importance => ?IMPORTANCE_HIDDEN}
)}
].
@ -344,6 +351,7 @@ fields("authz_cache") ->
#{
default => true,
required => true,
importance => ?IMPORTANCE_NO_DOC,
desc => ?DESC(fields_cache_enable)
}
)},
@ -380,6 +388,7 @@ fields("flapping_detect") ->
boolean(),
#{
default => false,
%% importance => ?IMPORTANCE_NO_DOC,
desc => ?DESC(flapping_detect_enable)
}
)},
@ -416,6 +425,7 @@ fields("force_shutdown") ->
boolean(),
#{
default => true,
importance => ?IMPORTANCE_NO_DOC,
desc => ?DESC(force_shutdown_enable)
}
)},
@ -445,6 +455,7 @@ fields("overload_protection") ->
boolean(),
#{
desc => ?DESC(overload_protection_enable),
%% importance => ?IMPORTANCE_NO_DOC,
default => false
}
)},
@ -505,7 +516,11 @@ fields("force_gc") ->
{"enable",
sc(
boolean(),
#{default => true, desc => ?DESC(force_gc_enable)}
#{
default => true,
importance => ?IMPORTANCE_NO_DOC,
desc => ?DESC(force_gc_enable)
}
)},
{"count",
sc(
@ -1658,6 +1673,7 @@ fields("durable_sessions") ->
sc(
boolean(), #{
desc => ?DESC(durable_sessions_enable),
%% importance => ?IMPORTANCE_NO_DOC,
default => false
}
)},
@ -1762,6 +1778,17 @@ fields("client_attrs_init") ->
desc => ?DESC("client_attrs_init_set_as_attr"),
validator => fun restricted_string/1
})}
];
fields("banned") ->
[
{bootstrap_file,
sc(
binary(),
#{
desc => ?DESC("banned_bootstrap_file"),
require => false
}
)}
].
compile_variform(undefined, _Opts) ->
@ -1870,6 +1897,7 @@ base_listener(Bind) ->
#{
default => true,
aliases => [enabled],
importance => ?IMPORTANCE_NO_DOC,
desc => ?DESC(fields_listener_enabled)
}
)},
@ -2101,6 +2129,8 @@ desc(durable_storage) ->
?DESC(durable_storage);
desc("client_attrs_init") ->
?DESC(client_attrs_init);
desc("banned") ->
"Banned .";
desc(_) ->
undefined.
@ -2396,6 +2426,7 @@ client_ssl_opts_schema(Defaults) ->
boolean(),
#{
default => false,
%% importance => ?IMPORTANCE_NO_DOC,
desc => ?DESC(client_ssl_opts_schema_enable)
}
)},

View File

@ -589,6 +589,14 @@ ensure_valid_options(Options, Versions) ->
ensure_valid_options([], _, Acc) ->
lists:reverse(Acc);
ensure_valid_options([{K, undefined} | T], Versions, Acc) when
K =:= crl_check;
K =:= crl_cache
->
%% Note: we must set crl options to `undefined' to unset them. Otherwise,
%% `esockd' will retain such options when `esockd:merge_opts/2' is called and the SSL
%% options were previously enabled.
ensure_valid_options(T, Versions, [{K, undefined} | Acc]);
ensure_valid_options([{_, undefined} | T], Versions, Acc) ->
ensure_valid_options(T, Versions, Acc);
ensure_valid_options([{_, ""} | T], Versions, Acc) ->

View File

@ -17,7 +17,6 @@
-include("emqx_mqtt.hrl").
-export([format/2]).
-export([format_meta_map/1]).
%% logger_formatter:config/0 is not exported.
-type config() :: map().
@ -43,10 +42,6 @@ format(
format(Event, Config) ->
emqx_logger_textfmt:format(Event, Config).
format_meta_map(Meta) ->
Encode = emqx_trace_handler:payload_encode(),
format_meta_map(Meta, Encode).
format_meta_map(Meta, Encode) ->
format_meta_map(Meta, Encode, [
{packet, fun format_packet/2},

View File

@ -436,6 +436,7 @@ websocket_handle({Frame, _}, State) ->
%% TODO: should not close the ws connection
?LOG(error, #{msg => "unexpected_frame", frame => Frame}),
shutdown(unexpected_ws_frame, State).
websocket_info({call, From, Req}, State) ->
handle_call(From, Req, State);
websocket_info({cast, rate_limit}, State) ->
@ -737,7 +738,8 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
input_bytes => Data
}),
FrameError = {frame_error, Reason},
{[{incoming, FrameError} | Packets], State};
NState = enrich_state(Reason, State),
{[{incoming, FrameError} | Packets], NState};
error:Reason:Stacktrace ->
?LOG(error, #{
at_state => emqx_frame:describe_state(ParseState),
@ -830,7 +832,7 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
?LOG(warning, #{
msg => "packet_discarded",
reason => "frame_too_large",
packet => emqx_packet:format(Packet)
packet => Packet
}),
ok = emqx_metrics:inc('delivery.dropped.too_large'),
ok = emqx_metrics:inc('delivery.dropped'),
@ -1069,6 +1071,13 @@ check_max_connection(Type, Listener) ->
{denny, Reason}
end
end.
enrich_state(#{parse_state := NParseState}, State) ->
Serialize = emqx_frame:serialize_opts(NParseState),
State#state{parse_state = NParseState, serialize = Serialize};
enrich_state(_, State) ->
State.
%%--------------------------------------------------------------------
%% For CT tests
%%--------------------------------------------------------------------

View File

@ -0,0 +1,4 @@
as,who,reason,at,until,by
clientid,c1,right,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,boot
username,u1,reason 1,abc,2025-10-25T21:53:47+08:00,boot
usernamx,u2,reason 2,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,boot
1 as who reason at until by
2 clientid c1 right 2021-10-25T21:53:47+08:00 2025-10-25T21:53:47+08:00 boot
3 username u1 reason 1 abc 2025-10-25T21:53:47+08:00 boot
4 usernamx u2 reason 2 2021-10-25T21:53:47+08:00 2025-10-25T21:53:47+08:00 boot

View File

@ -0,0 +1,3 @@
as,who,reason,at,until,by
clientid,c1,reason 1,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,boot
username,u1,reason 2,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,boot
1 as who reason at until by
2 clientid c1 reason 1 2021-10-25T21:53:47+08:00 2025-10-25T21:53:47+08:00 boot
3 username u1 reason 2 2021-10-25T21:53:47+08:00 2025-10-25T21:53:47+08:00 boot

View File

@ -0,0 +1,3 @@
as,who,reason,at,until,by
clientid,c2,reason 1,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,boot
username,u2,reason 2,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,boot
1 as who reason at until by
2 clientid c2 reason 1 2021-10-25T21:53:47+08:00 2025-10-25T21:53:47+08:00 boot
3 username u2 reason 2 2021-10-25T21:53:47+08:00 2025-10-25T21:53:47+08:00 boot

View File

@ -0,0 +1,3 @@
as,who,reason,at,until,by
clientid,c1,,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,
username,u1,,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,
1 as who reason at until by
2 clientid c1 2021-10-25T21:53:47+08:00 2025-10-25T21:53:47+08:00
3 username u1 2021-10-25T21:53:47+08:00 2025-10-25T21:53:47+08:00

View File

@ -0,0 +1,3 @@
as,who
clientid,c1
username,u1
1 as who
2 clientid c1
3 username u1

View File

@ -254,6 +254,45 @@ t_session_taken(_) ->
{ok, #{}, [0]} = emqtt:unsubscribe(C3, Topic),
ok = emqtt:disconnect(C3).
t_full_bootstrap_file(_) ->
emqx_banned:clear(),
?assertEqual(ok, emqx_banned:init_from_csv(mk_bootstrap_file(<<"full.csv">>))),
FullDatas = lists:sort([
{banned, {username, <<"u1">>}, <<"boot">>, <<"reason 2">>, 1635170027, 1761400427},
{banned, {clientid, <<"c1">>}, <<"boot">>, <<"reason 1">>, 1635170027, 1761400427}
]),
?assertMatch(FullDatas, lists:sort(get_banned_list())),
?assertEqual(ok, emqx_banned:init_from_csv(mk_bootstrap_file(<<"full2.csv">>))),
?assertMatch(FullDatas, lists:sort(get_banned_list())),
ok.
t_optional_bootstrap_file(_) ->
emqx_banned:clear(),
?assertEqual(ok, emqx_banned:init_from_csv(mk_bootstrap_file(<<"optional.csv">>))),
Keys = lists:sort([{username, <<"u1">>}, {clientid, <<"c1">>}]),
?assertMatch(Keys, lists:sort([element(2, Data) || Data <- get_banned_list()])),
ok.
t_omitted_bootstrap_file(_) ->
emqx_banned:clear(),
?assertEqual(ok, emqx_banned:init_from_csv(mk_bootstrap_file(<<"omitted.csv">>))),
Keys = lists:sort([{username, <<"u1">>}, {clientid, <<"c1">>}]),
?assertMatch(Keys, lists:sort([element(2, Data) || Data <- get_banned_list()])),
ok.
t_error_bootstrap_file(_) ->
emqx_banned:clear(),
?assertEqual(
{error, enoent}, emqx_banned:init_from_csv(mk_bootstrap_file(<<"not_exists.csv">>))
),
?assertEqual(
ok, emqx_banned:init_from_csv(mk_bootstrap_file(<<"error.csv">>))
),
Keys = [{clientid, <<"c1">>}],
?assertMatch(Keys, [element(2, Data) || Data <- get_banned_list()]),
ok.
receive_messages(Count) ->
receive_messages(Count, []).
receive_messages(0, Msgs) ->
@ -269,3 +308,17 @@ receive_messages(Count, Msgs) ->
after 1200 ->
Msgs
end.
mk_bootstrap_file(File) ->
Dir = code:lib_dir(emqx, test),
filename:join([Dir, <<"data/banned">>, File]).
get_banned_list() ->
Tabs = emqx_banned:tables(),
lists:foldl(
fun(Tab, Acc) ->
Acc ++ ets:tab2list(Tab)
end,
[],
Tabs
).

View File

@ -60,7 +60,8 @@
{emqx_statsd, 1},
{emqx_plugin_libs, 1},
{emqx_persistent_session, 1},
{emqx_ds, 3}
{emqx_ds, 3},
{emqx_node_rebalance_purge, 1}
]).
%% List of known RPC backend modules:
-define(RPC_MODULES, "gen_rpc, erpc, rpc, emqx_rpc").

View File

@ -414,24 +414,32 @@ t_handle_in_auth(_) ->
emqx_channel:handle_in(?AUTH_PACKET(), Channel).
t_handle_in_frame_error(_) ->
IdleChannel = channel(#{conn_state => idle}),
{shutdown, #{shutdown_count := frame_too_large, cause := frame_too_large}, _Chan} =
emqx_channel:handle_in({frame_error, #{cause => frame_too_large}}, IdleChannel),
IdleChannelV5 = channel(#{conn_state => idle}),
%% no CONNACK packet for v4
?assertMatch(
{shutdown, #{shutdown_count := frame_too_large, cause := frame_too_large}, _Chan},
emqx_channel:handle_in(
{frame_error, #{cause => frame_too_large}}, v4(IdleChannelV5)
)
),
ConnectingChan = channel(#{conn_state => connecting}),
ConnackPacket = ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE),
{shutdown,
#{
shutdown_count := frame_too_large,
cause := frame_too_large,
limit := 100,
received := 101
},
ConnackPacket,
_} =
?assertMatch(
{shutdown,
#{
shutdown_count := frame_too_large,
cause := frame_too_large,
limit := 100,
received := 101
},
ConnackPacket, _},
emqx_channel:handle_in(
{frame_error, #{cause => frame_too_large, received => 101, limit => 100}},
ConnectingChan
),
)
),
DisconnectPacket = ?DISCONNECT_PACKET(?RC_PACKET_TOO_LARGE),
ConnectedChan = channel(#{conn_state => connected}),
?assertMatch(

View File

@ -78,6 +78,7 @@
start_epmd/0,
start_peer/2,
stop_peer/1,
ebin_path/0,
listener_port/2
]).

View File

@ -138,13 +138,14 @@ init_per_testcase(t_refresh_config = TestCase, Config) ->
];
init_per_testcase(TestCase, Config) when
TestCase =:= t_update_listener;
TestCase =:= t_update_listener_enable_disable;
TestCase =:= t_validations
->
ct:timetrap({seconds, 30}),
ok = snabbkaffe:start_trace(),
%% when running emqx standalone tests, we can't use those
%% features.
case does_module_exist(emqx_management) of
case does_module_exist(emqx_mgmt) of
true ->
DataDir = ?config(data_dir, Config),
CRLFile = filename:join([DataDir, "intermediate-revoked.crl.pem"]),
@ -165,7 +166,7 @@ init_per_testcase(TestCase, Config) when
{emqx_conf, #{config => #{listeners => #{ssl => #{default => ListenerConf}}}}},
emqx,
emqx_management,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
emqx_mgmt_api_test_util:emqx_dashboard()
],
#{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}
),
@ -206,6 +207,7 @@ read_crl(Filename) ->
end_per_testcase(TestCase, Config) when
TestCase =:= t_update_listener;
TestCase =:= t_update_listener_enable_disable;
TestCase =:= t_validations
->
Skip = proplists:get_bool(skip_does_not_apply, Config),
@ -1057,3 +1059,104 @@ do_t_validations(_Config) ->
),
ok.
%% Checks that if CRL is ever enabled and then disabled, clients can connect, even if they
%% would otherwise not have their corresponding CRLs cached and fail with `{bad_crls,
%% no_relevant_crls}`.
t_update_listener_enable_disable(Config) ->
case proplists:get_bool(skip_does_not_apply, Config) of
true ->
ct:pal("skipping as this test does not apply in this profile"),
ok;
false ->
do_t_update_listener_enable_disable(Config)
end.
do_t_update_listener_enable_disable(Config) ->
DataDir = ?config(data_dir, Config),
Keyfile = filename:join([DataDir, "server.key.pem"]),
Certfile = filename:join([DataDir, "server.cert.pem"]),
Cacertfile = filename:join([DataDir, "ca-chain.cert.pem"]),
ClientCert = filename:join(DataDir, "client.cert.pem"),
ClientKey = filename:join(DataDir, "client.key.pem"),
ListenerId = "ssl:default",
%% Enable CRL
{ok, {{_, 200, _}, _, ListenerData0}} = get_listener_via_api(ListenerId),
CRLConfig0 =
#{
<<"ssl_options">> =>
#{
<<"keyfile">> => Keyfile,
<<"certfile">> => Certfile,
<<"cacertfile">> => Cacertfile,
<<"enable_crl_check">> => true,
<<"fail_if_no_peer_cert">> => true
}
},
ListenerData1 = emqx_utils_maps:deep_merge(ListenerData0, CRLConfig0),
{ok, {_, _, ListenerData2}} = update_listener_via_api(ListenerId, ListenerData1),
?assertMatch(
#{
<<"ssl_options">> :=
#{
<<"enable_crl_check">> := true,
<<"verify">> := <<"verify_peer">>,
<<"fail_if_no_peer_cert">> := true
}
},
ListenerData2
),
%% Disable CRL
CRLConfig1 =
#{
<<"ssl_options">> =>
#{
<<"keyfile">> => Keyfile,
<<"certfile">> => Certfile,
<<"cacertfile">> => Cacertfile,
<<"enable_crl_check">> => false,
<<"fail_if_no_peer_cert">> => true
}
},
ListenerData3 = emqx_utils_maps:deep_merge(ListenerData2, CRLConfig1),
redbug:start(
[
"esockd_server:get_listener_prop -> return",
"esockd_server:set_listener_prop -> return",
"esockd:merge_opts -> return",
"esockd_listener_sup:set_options -> return",
"emqx_listeners:inject_crl_config -> return"
],
[{msgs, 100}]
),
{ok, {_, _, ListenerData4}} = update_listener_via_api(ListenerId, ListenerData3),
?assertMatch(
#{
<<"ssl_options">> :=
#{
<<"enable_crl_check">> := false,
<<"verify">> := <<"verify_peer">>,
<<"fail_if_no_peer_cert">> := true
}
},
ListenerData4
),
%% Now the client that would be blocked tries to connect and should now be allowed.
{ok, C} = emqtt:start_link([
{ssl, true},
{ssl_opts, [
{certfile, ClientCert},
{keyfile, ClientKey},
{verify, verify_none}
]},
{port, 8883}
]),
?assertMatch({ok, _}, emqtt:connect(C)),
emqtt:stop(C),
?assertNotReceive({http_get, _}),
ok.

View File

@ -64,6 +64,7 @@
-export([work_dir/1]).
-export([work_dir/2]).
-export([clean_work_dir/1]).
-export([load_apps/1]).
-export([start_apps/2]).
@ -78,6 +79,8 @@
%% "Unofficial" `emqx_config_handler' and `emqx_conf' APIs
-export([schema_module/0, upgrade_raw_conf/1]).
-export([skip_if_oss/0]).
-export_type([appspec/0]).
-export_type([appspec_opts/0]).
@ -162,6 +165,7 @@ start(Apps, SuiteOpts = #{work_dir := WorkDir}) ->
% 4. Setup isolated mnesia directory
ok = emqx_common_test_helpers:load(mnesia),
ok = application:set_env(mnesia, dir, filename:join([WorkDir, mnesia])),
ok = application:set_env(emqx_durable_storage, db_data_dir, filename:join([WorkDir, ds])),
% 5. Start ekka separately.
% For some reason it's designed to be started in non-regular way, so we have to track
% applications started in the process manually.
@ -387,6 +391,8 @@ default_appspec(emqx_schema_validation, _SuiteOpts) ->
#{schema_mod => emqx_schema_validation_schema, config => #{}};
default_appspec(emqx_message_transformation, _SuiteOpts) ->
#{schema_mod => emqx_message_transformation_schema, config => #{}};
default_appspec(emqx_ds_shared_sub, _SuiteOpts) ->
#{schema_mod => emqx_ds_shared_sub_schema, config => #{}};
default_appspec(_, _) ->
#{}.
@ -432,6 +438,16 @@ work_dir(TCName, CTConfig) ->
WorkDir = work_dir(CTConfig),
filename:join(WorkDir, TCName).
%% @doc Delete contents of the workdir.
clean_work_dir(WorkDir) ->
ct:pal("Cleaning workdir ~p", [WorkDir]),
case re:run(WorkDir, "./_build/test/logs/") of
{match, _} ->
file:del_dir_r(WorkDir);
nomatch ->
error({unsafe_workdir, WorkDir})
end.
%%
start_ekka() ->
@ -507,3 +523,14 @@ upgrade_raw_conf(Conf) ->
ce ->
emqx_conf_schema:upgrade_raw_conf(Conf)
end.
skip_if_oss() ->
try emqx_release:edition() of
ee ->
false;
_ ->
{skip, not_supported_in_oss}
catch
error:undef ->
{skip, standalone_not_supported}
end.

View File

@ -56,6 +56,8 @@ t_exclusive_sub(_) ->
{ok, _} = emqtt:connect(C1),
?CHECK_SUB(C1, 0),
?CHECK_SUB(C1, 0),
{ok, C2} = emqtt:start_link([
{clientid, <<"client2">>},
{clean_start, false},

View File

@ -63,6 +63,7 @@ groups() ->
t_parse_malformed_properties,
t_malformed_connect_header,
t_malformed_connect_data,
t_malformed_connect_data_proto_ver,
t_reserved_connect_flag,
t_invalid_clientid,
t_undefined_password,
@ -167,6 +168,8 @@ t_parse_malformed_utf8_string(_) ->
ParseState = emqx_frame:initial_parse_state(#{strict_mode => true}),
?ASSERT_FRAME_THROW(utf8_string_invalid, emqx_frame:parse(MalformedPacket, ParseState)).
%% TODO: parse v3 with 0 length clientid
t_serialize_parse_v3_connect(_) ->
Bin =
<<16, 37, 0, 6, 77, 81, 73, 115, 100, 112, 3, 2, 0, 60, 0, 23, 109, 111, 115, 113, 112, 117,
@ -324,7 +327,7 @@ t_serialize_parse_bridge_connect(_) ->
header = #mqtt_packet_header{type = ?CONNECT},
variable = #mqtt_packet_connect{
clientid = <<"C_00:0C:29:2B:77:52">>,
proto_ver = 16#03,
proto_ver = ?MQTT_PROTO_V3,
proto_name = <<"MQIsdp">>,
is_bridge = true,
will_retain = true,
@ -686,15 +689,36 @@ t_malformed_connect_header(_) ->
).
t_malformed_connect_data(_) ->
ProtoNameWithLen = <<0, 6, "MQIsdp">>,
ConnectFlags = <<2#00000000>>,
ClientIdwithLen = <<0, 1, "a">>,
UnexpectedRestBin = <<0, 1, 2>>,
?ASSERT_FRAME_THROW(
#{cause := malformed_connect, unexpected_trailing_bytes := _},
emqx_frame:parse(<<16, 15, 0, 6, 77, 81, 73, 115, 100, 112, 3, 0, 0, 0, 0, 0, 0>>)
#{cause := malformed_connect, unexpected_trailing_bytes := 3},
emqx_frame:parse(
<<16, 18, ProtoNameWithLen/binary, ?MQTT_PROTO_V3, ConnectFlags/binary, 0, 0,
ClientIdwithLen/binary, UnexpectedRestBin/binary>>
)
).
t_malformed_connect_data_proto_ver(_) ->
Proto3NameWithLen = <<0, 6, "MQIsdp">>,
?ASSERT_FRAME_THROW(
#{cause := malformed_connect, header_bytes := <<>>},
emqx_frame:parse(<<16, 8, Proto3NameWithLen/binary>>)
),
ProtoNameWithLen = <<0, 4, "MQTT">>,
?ASSERT_FRAME_THROW(
#{cause := malformed_connect, header_bytes := <<>>},
emqx_frame:parse(<<16, 6, ProtoNameWithLen/binary>>)
).
t_reserved_connect_flag(_) ->
?assertException(
throw,
{frame_parse_error, reserved_connect_flag},
{frame_parse_error, #{
cause := reserved_connect_flag, proto_ver := ?MQTT_PROTO_V3, proto_name := <<"MQIsdp">>
}},
emqx_frame:parse(<<16, 15, 0, 6, 77, 81, 73, 115, 100, 112, 3, 1, 0, 0, 1, 0, 0>>)
).
@ -726,7 +750,7 @@ t_undefined_password(_) ->
},
variable = #mqtt_packet_connect{
proto_name = <<"MQTT">>,
proto_ver = 4,
proto_ver = ?MQTT_PROTO_V4,
is_bridge = false,
clean_start = true,
will_flag = false,
@ -774,7 +798,9 @@ t_invalid_will_retain(_) ->
54, 75, 78, 112, 57, 0, 6, 68, 103, 55, 87, 87, 87>>,
?assertException(
throw,
{frame_parse_error, invalid_will_retain},
{frame_parse_error, #{
cause := invalid_will_retain, proto_ver := ?MQTT_PROTO_V5, proto_name := <<"MQTT">>
}},
emqx_frame:parse(ConnectBin)
),
ok.
@ -796,22 +822,30 @@ t_invalid_will_qos(_) ->
),
?assertException(
throw,
{frame_parse_error, invalid_will_qos},
{frame_parse_error, #{
cause := invalid_will_qos, proto_ver := ?MQTT_PROTO_V5, proto_name := <<"MQTT">>
}},
emqx_frame:parse(ConnectBinFun(Will_F_WillQoS1))
),
?assertException(
throw,
{frame_parse_error, invalid_will_qos},
{frame_parse_error, #{
cause := invalid_will_qos, proto_ver := ?MQTT_PROTO_V5, proto_name := <<"MQTT">>
}},
emqx_frame:parse(ConnectBinFun(Will_F_WillQoS2))
),
?assertException(
throw,
{frame_parse_error, invalid_will_qos},
{frame_parse_error, #{
cause := invalid_will_qos, proto_ver := ?MQTT_PROTO_V5, proto_name := <<"MQTT">>
}},
emqx_frame:parse(ConnectBinFun(Will_F_WillQoS3))
),
?assertException(
throw,
{frame_parse_error, invalid_will_qos},
{frame_parse_error, #{
cause := invalid_will_qos, proto_ver := ?MQTT_PROTO_V5, proto_name := <<"MQTT">>
}},
emqx_frame:parse(ConnectBinFun(Will_T_WillQoS3))
),
ok.

View File

@ -26,6 +26,7 @@
%% Have to use real msgs, as the schema is guarded by enum.
-define(THROTTLE_MSG, authorization_permission_denied).
-define(THROTTLE_MSG1, cannot_publish_to_topic_due_to_not_authorized).
-define(THROTTLE_UNRECOVERABLE_MSG, unrecoverable_resource_error).
-define(TIME_WINDOW, <<"1s">>).
all() -> emqx_common_test_helpers:all(?MODULE).
@ -59,6 +60,11 @@ end_per_suite(Config) ->
emqx_cth_suite:stop(?config(suite_apps, Config)),
emqx_config:delete_override_conf_files().
init_per_testcase(t_throttle_recoverable_msg, Config) ->
ok = snabbkaffe:start_trace(),
[?THROTTLE_MSG] = Conf = emqx:get_config([log, throttling, msgs]),
{ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_UNRECOVERABLE_MSG | Conf], #{}),
Config;
init_per_testcase(t_throttle_add_new_msg, Config) ->
ok = snabbkaffe:start_trace(),
[?THROTTLE_MSG] = Conf = emqx:get_config([log, throttling, msgs]),
@ -72,6 +78,10 @@ init_per_testcase(_TC, Config) ->
ok = snabbkaffe:start_trace(),
Config.
end_per_testcase(t_throttle_recoverable_msg, _Config) ->
ok = snabbkaffe:stop(),
{ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_MSG], #{}),
ok;
end_per_testcase(t_throttle_add_new_msg, _Config) ->
ok = snabbkaffe:stop(),
{ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_MSG], #{}),
@ -101,8 +111,8 @@ t_throttle(_Config) ->
5000
),
?assert(emqx_log_throttler:allow(?THROTTLE_MSG)),
?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG)),
?assert(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)),
?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)),
{ok, _} = ?block_until(
#{
?snk_kind := log_throttler_dropped,
@ -115,14 +125,48 @@ t_throttle(_Config) ->
[]
).
t_throttle_recoverable_msg(_Config) ->
ResourceId = <<"resource_id">>,
ThrottledMsg = iolist_to_binary([atom_to_list(?THROTTLE_UNRECOVERABLE_MSG), ":", ResourceId]),
?check_trace(
begin
%% Warm-up and block to increase the probability that next events
%% will be in the same throttling time window.
{ok, _} = ?block_until(
#{?snk_kind := log_throttler_new_msg, throttled_msg := ?THROTTLE_UNRECOVERABLE_MSG},
5000
),
{_, {ok, _}} = ?wait_async_action(
events(?THROTTLE_UNRECOVERABLE_MSG, ResourceId),
#{
?snk_kind := log_throttler_dropped,
throttled_msg := ThrottledMsg
},
5000
),
?assert(emqx_log_throttler:allow(?THROTTLE_UNRECOVERABLE_MSG, ResourceId)),
?assertNot(emqx_log_throttler:allow(?THROTTLE_UNRECOVERABLE_MSG, ResourceId)),
{ok, _} = ?block_until(
#{
?snk_kind := log_throttler_dropped,
throttled_msg := ThrottledMsg,
dropped_count := 1
},
3000
)
end,
[]
).
t_throttle_add_new_msg(_Config) ->
?check_trace(
begin
{ok, _} = ?block_until(
#{?snk_kind := log_throttler_new_msg, throttled_msg := ?THROTTLE_MSG1}, 5000
),
?assert(emqx_log_throttler:allow(?THROTTLE_MSG1)),
?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG1)),
?assert(emqx_log_throttler:allow(?THROTTLE_MSG1, undefined)),
?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG1, undefined)),
{ok, _} = ?block_until(
#{
?snk_kind := log_throttler_dropped,
@ -137,10 +181,15 @@ t_throttle_add_new_msg(_Config) ->
t_throttle_no_msg(_Config) ->
%% Must simply pass with no crashes
?assert(emqx_log_throttler:allow(no_test_throttle_msg)),
?assert(emqx_log_throttler:allow(no_test_throttle_msg)),
timer:sleep(10),
?assert(erlang:is_process_alive(erlang:whereis(emqx_log_throttler))).
Pid = erlang:whereis(emqx_log_throttler),
?assert(emqx_log_throttler:allow(no_test_throttle_msg, undefined)),
?assert(emqx_log_throttler:allow(no_test_throttle_msg, undefined)),
%% assert process is not restarted
?assertEqual(Pid, erlang:whereis(emqx_log_throttler)),
%% make a gen_call to ensure the process is alive
%% note: this call result in an 'unexpected_call' error log.
?assertEqual(ignored, gen_server:call(Pid, probe)),
ok.
t_update_time_window(_Config) ->
?check_trace(
@ -168,8 +217,8 @@ t_throttle_debug_primary_level(_Config) ->
#{?snk_kind := log_throttler_dropped, throttled_msg := ?THROTTLE_MSG},
5000
),
?assert(emqx_log_throttler:allow(?THROTTLE_MSG)),
?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG)),
?assert(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)),
?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)),
{ok, _} = ?block_until(
#{
?snk_kind := log_throttler_dropped,
@ -187,10 +236,13 @@ t_throttle_debug_primary_level(_Config) ->
%%--------------------------------------------------------------------
events(Msg) ->
events(100, Msg).
events(100, Msg, undefined).
events(N, Msg) ->
[emqx_log_throttler:allow(Msg) || _ <- lists:seq(1, N)].
events(Msg, Id) ->
events(100, Msg, Id).
events(N, Msg, Id) ->
[emqx_log_throttler:allow(Msg, Id) || _ <- lists:seq(1, N)].
module_exists(Mod) ->
case erlang:module_loaded(Mod) of

View File

@ -377,42 +377,60 @@ t_will_msg(_) ->
t_format(_) ->
io:format("~ts", [
emqx_packet:format(#mqtt_packet{
header = #mqtt_packet_header{type = ?CONNACK, retain = true, dup = 0},
variable = undefined
})
]),
io:format("~ts", [
emqx_packet:format(#mqtt_packet{
header = #mqtt_packet_header{type = ?CONNACK}, variable = 1, payload = <<"payload">>
})
emqx_packet:format(
#mqtt_packet{
header = #mqtt_packet_header{type = ?CONNACK, retain = true, dup = 0},
variable = undefined
},
text
)
]),
io:format(
"~ts",
[
emqx_packet:format(
#mqtt_packet{
header = #mqtt_packet_header{type = ?CONNACK},
variable = 1,
payload = <<"payload">>
},
text
)
]
),
io:format("~ts", [
emqx_packet:format(
?CONNECT_PACKET(#mqtt_packet_connect{
will_flag = true,
will_retain = true,
will_qos = ?QOS_2,
will_topic = <<"topic">>,
will_payload = <<"payload">>
})
?CONNECT_PACKET(
#mqtt_packet_connect{
will_flag = true,
will_retain = true,
will_qos = ?QOS_2,
will_topic = <<"topic">>,
will_payload = <<"payload">>
}
),
text
)
]),
io:format("~ts", [
emqx_packet:format(?CONNECT_PACKET(#mqtt_packet_connect{password = password}))
emqx_packet:format(?CONNECT_PACKET(#mqtt_packet_connect{password = password}), text)
]),
io:format("~ts", [emqx_packet:format(?CONNACK_PACKET(?CONNACK_SERVER))]),
io:format("~ts", [emqx_packet:format(?PUBLISH_PACKET(?QOS_1, 1))]),
io:format("~ts", [emqx_packet:format(?PUBLISH_PACKET(?QOS_2, <<"topic">>, 10, <<"payload">>))]),
io:format("~ts", [emqx_packet:format(?PUBACK_PACKET(?PUBACK, 98))]),
io:format("~ts", [emqx_packet:format(?PUBREL_PACKET(99))]),
io:format("~ts", [emqx_packet:format(?CONNACK_PACKET(?CONNACK_SERVER), text)]),
io:format("~ts", [emqx_packet:format(?PUBLISH_PACKET(?QOS_1, 1), text)]),
io:format("~ts", [
emqx_packet:format(?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS_0}, {<<"topic1">>, ?QOS_1}]))
emqx_packet:format(?PUBLISH_PACKET(?QOS_2, <<"topic">>, 10, <<"payload">>), text)
]),
io:format("~ts", [emqx_packet:format(?SUBACK_PACKET(40, [?QOS_0, ?QOS_1]))]),
io:format("~ts", [emqx_packet:format(?UNSUBSCRIBE_PACKET(89, [<<"t">>, <<"t2">>]))]),
io:format("~ts", [emqx_packet:format(?UNSUBACK_PACKET(90))]),
io:format("~ts", [emqx_packet:format(?DISCONNECT_PACKET(128))]).
io:format("~ts", [emqx_packet:format(?PUBACK_PACKET(?PUBACK, 98), text)]),
io:format("~ts", [emqx_packet:format(?PUBREL_PACKET(99), text)]),
io:format("~ts", [
emqx_packet:format(
?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS_0}, {<<"topic1">>, ?QOS_1}]), text
)
]),
io:format("~ts", [emqx_packet:format(?SUBACK_PACKET(40, [?QOS_0, ?QOS_1]), text)]),
io:format("~ts", [emqx_packet:format(?UNSUBSCRIBE_PACKET(89, [<<"t">>, <<"t2">>]), text)]),
io:format("~ts", [emqx_packet:format(?UNSUBACK_PACKET(90), text)]),
io:format("~ts", [emqx_packet:format(?DISCONNECT_PACKET(128), text)]).
t_parse_empty_publish(_) ->
%% 52: 0011(type=PUBLISH) 0100 (QoS=2)

View File

@ -124,4 +124,18 @@ t_hash(_) ->
false = emqx_passwd:check_pass({pbkdf2, sha, Pbkdf2Salt, 2, BadDKlen}, Pbkdf2, Password),
%% Invalid derived_length, pbkdf2 fails
?assertException(error, _, emqx_passwd:hash({pbkdf2, sha, Pbkdf2Salt, 2, BadDKlen}, Password)).
?assertException(error, _, emqx_passwd:hash({pbkdf2, sha, Pbkdf2Salt, 2, BadDKlen}, Password)),
%% invalid salt (not binary)
?assertException(
error,
{salt_not_string, false},
emqx_passwd:hash({sha256, false, suffix}, Password)
),
%% invalid password (not binary)
?assertException(
error,
{password_not_string, bad_password_type},
emqx_passwd:hash({sha256, Salt, suffix}, bad_password_type)
).

View File

@ -573,7 +573,7 @@ app_specs(Opts) ->
cluster() ->
ExtraConf = "\n durable_storage.messages.n_sites = 2",
Spec = #{role => core, apps => app_specs(#{extra_emqx_conf => ExtraConf})},
Spec = #{apps => app_specs(#{extra_emqx_conf => ExtraConf})},
[
{persistent_messages_SUITE1, Spec},
{persistent_messages_SUITE2, Spec}

View File

@ -816,8 +816,8 @@ t_no_limiter_for_listener(_) ->
CfgStr = <<>>,
ok = emqx_common_test_helpers:load_config(emqx_schema, CfgStr),
ListenerOpt = emqx:get_config([listeners, tcp, default]),
?assertEqual(
undefined,
?assertMatch(
#{connection := #{rate := infinity}},
emqx_limiter_utils:get_listener_opts(ListenerOpt)
).

View File

@ -64,18 +64,28 @@ init_per_group(routing_schema_v2, Config) ->
init_per_group(batch_sync_on, Config) ->
[{emqx_config, "broker.routing.batch_sync.enable_on = all"} | Config];
init_per_group(batch_sync_replicants, Config) ->
[{emqx_config, "broker.routing.batch_sync.enable_on = replicant"} | Config];
case emqx_cth_suite:skip_if_oss() of
false ->
[{emqx_config, "broker.routing.batch_sync.enable_on = replicant"} | Config];
True ->
True
end;
init_per_group(batch_sync_off, Config) ->
[{emqx_config, "broker.routing.batch_sync.enable_on = none"} | Config];
init_per_group(cluster, Config) ->
WorkDir = emqx_cth_suite:work_dir(Config),
NodeSpecs = [
{emqx_routing_SUITE1, #{apps => [mk_emqx_appspec(1, Config)], role => core}},
{emqx_routing_SUITE2, #{apps => [mk_emqx_appspec(2, Config)], role => core}},
{emqx_routing_SUITE3, #{apps => [mk_emqx_appspec(3, Config)], role => replicant}}
],
Nodes = emqx_cth_cluster:start(NodeSpecs, #{work_dir => WorkDir}),
[{cluster, Nodes} | Config];
case emqx_cth_suite:skip_if_oss() of
false ->
WorkDir = emqx_cth_suite:work_dir(Config),
NodeSpecs = [
{emqx_routing_SUITE1, #{apps => [mk_emqx_appspec(1, Config)], role => core}},
{emqx_routing_SUITE2, #{apps => [mk_emqx_appspec(2, Config)], role => core}},
{emqx_routing_SUITE3, #{apps => [mk_emqx_appspec(3, Config)], role => replicant}}
],
Nodes = emqx_cth_cluster:start(NodeSpecs, #{work_dir => WorkDir}),
[{cluster, Nodes} | Config];
True ->
True
end;
init_per_group(GroupName, Config) when
GroupName =:= single_batch_on;
GroupName =:= single

View File

@ -1247,7 +1247,7 @@ recv_msgs(Count, Msgs) ->
start_peer(Name, Port) ->
{ok, Node} = emqx_cth_peer:start_link(
Name,
ebin_path()
emqx_common_test_helpers:ebin_path()
),
pong = net_adm:ping(Node),
setup_node(Node, Port),
@ -1261,9 +1261,6 @@ host() ->
[_, Host] = string:tokens(atom_to_list(node()), "@"),
Host.
ebin_path() ->
["-pa" | code:get_path()].
setup_node(Node, Port) ->
EnvHandler =
fun(_) ->

View File

@ -28,7 +28,7 @@
-type authenticator_id() :: binary().
-define(AUTHN_RESOURCE_GROUP, <<"emqx_authn">>).
-define(AUTHN_RESOURCE_GROUP, <<"authn">>).
%% VAR_NS_CLIENT_ATTRS is added here because it can be initialized before authn.
%% NOTE: authn return may add more to (or even overwrite) client_attrs.

View File

@ -156,7 +156,7 @@
count => 1
}).
-define(AUTHZ_RESOURCE_GROUP, <<"emqx_authz">>).
-define(AUTHZ_RESOURCE_GROUP, <<"authz">>).
-define(AUTHZ_FEATURES, [rich_actions]).

View File

@ -28,6 +28,7 @@ defmodule EMQXAuth.MixProject do
def deps() do
[
{:emqx_mix_utils, in_umbrella: true, runtime: false},
{:emqx, in_umbrella: true},
{:emqx_utils, in_umbrella: true}
]

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_auth, [
{description, "EMQX Authentication and authorization"},
{vsn, "0.3.3"},
{vsn, "0.3.4"},
{modules, []},
{registered, [emqx_auth_sup]},
{applications, [

View File

@ -203,6 +203,7 @@ common_fields() ->
enable(type) -> boolean();
enable(default) -> true;
enable(importance) -> ?IMPORTANCE_NO_DOC;
enable(desc) -> ?DESC(?FUNCTION_NAME);
enable(_) -> undefined.

View File

@ -477,9 +477,15 @@ authorize_deny(
sources()
) ->
authz_result().
authorize(Client, PubSub, Topic, _DefaultResult, Sources) ->
authorize(#{username := Username} = Client, PubSub, Topic, _DefaultResult, Sources) ->
case maps:get(is_superuser, Client, false) of
true ->
?tp(authz_skipped, #{reason => client_is_superuser, action => PubSub}),
?TRACE("AUTHZ", "authorization_skipped_as_superuser", #{
username => Username,
topic => Topic,
action => emqx_access_control:format_action(PubSub)
}),
emqx_metrics:inc(?METRIC_SUPERUSER),
{stop, #{result => allow, from => superuser}};
false ->

View File

@ -470,7 +470,13 @@ make_result_map(ResList) ->
lists:foldl(Fun, {maps:new(), maps:new(), maps:new(), maps:new()}, ResList).
restructure_map(#{
counters := #{deny := Failed, total := Total, allow := Succ, nomatch := Nomatch},
counters := #{
ignore := Ignore,
deny := Failed,
total := Total,
allow := Succ,
nomatch := Nomatch
},
rate := #{total := #{current := Rate, last5m := Rate5m, max := RateMax}}
}) ->
#{
@ -478,6 +484,7 @@ restructure_map(#{
allow => Succ,
deny => Failed,
nomatch => Nomatch,
ignore => Ignore,
rate => Rate,
rate_last5m => Rate5m,
rate_max => RateMax

View File

@ -68,7 +68,8 @@
-export_type([
permission_resolution/0,
action_condition/0,
topic_condition/0
topic_condition/0,
rule/0
]).
%%--------------------------------------------------------------------
@ -197,7 +198,7 @@ qos_from_opts(Opts) ->
)
end
catch
{bad_qos, QoS} ->
throw:{bad_qos, QoS} ->
throw(#{
reason => invalid_authorization_qos,
qos => QoS

View File

@ -21,7 +21,7 @@
-module(emqx_authz_rule_raw).
-export([parse_rule/1, format_rule/1]).
-export([parse_rule/1, parse_and_compile_rules/1, format_rule/1]).
-include("emqx_authz.hrl").
@ -55,6 +55,27 @@
%% API
%%--------------------------------------------------------------------
%% @doc Parse and compile raw ACL rules.
%% If any bad rule is found, `{bad_acl_rule, ..}' is thrown.
-spec parse_and_compile_rules([rule_raw()]) -> [emqx_authz_rule:rule()].
parse_and_compile_rules(Rules) ->
lists:map(
fun(Rule) ->
case parse_rule(Rule) of
{ok, {Permission, Action, Topics}} ->
try
emqx_authz_rule:compile({Permission, all, Action, Topics})
catch
throw:Reason ->
throw({bad_acl_rule, Reason})
end;
{error, Reason} ->
throw({bad_acl_rule, Reason})
end
end,
Rules
).
-spec parse_rule(rule_raw()) ->
{ok, {
emqx_authz_rule:permission_resolution_precompile(),

View File

@ -88,6 +88,7 @@ fields("metrics_status_fields") ->
fields("metrics") ->
[
{"total", ?HOCON(integer(), #{desc => ?DESC("metrics_total")})},
{"ignore", ?HOCON(integer(), #{desc => ?DESC("ignore")})},
{"allow", ?HOCON(integer(), #{desc => ?DESC("allow")})},
{"deny", ?HOCON(integer(), #{desc => ?DESC("deny")})},
{"nomatch", ?HOCON(float(), #{desc => ?DESC("nomatch")})}
@ -169,7 +170,12 @@ api_authz_refs() ->
authz_common_fields(Type) ->
[
{type, ?HOCON(Type, #{required => true, desc => ?DESC(type)})},
{enable, ?HOCON(boolean(), #{default => true, desc => ?DESC(enable)})}
{enable,
?HOCON(boolean(), #{
default => true,
importance => ?IMPORTANCE_NO_DOC,
desc => ?DESC(enable)
})}
].
source_types() ->

View File

@ -16,6 +16,9 @@
-module(emqx_authz_utils).
-feature(maybe_expr, enable).
-include_lib("emqx/include/emqx_placeholder.hrl").
-include_lib("emqx_authz.hrl").
-include_lib("snabbkaffe/include/trace.hrl").
@ -28,7 +31,7 @@
remove_resource/1,
update_config/2,
vars_for_rule_query/2,
parse_rule_from_row/2
do_authorize/6
]).
-export([
@ -133,14 +136,18 @@ content_type(Headers) when is_list(Headers) ->
-define(RAW_RULE_KEYS, [<<"permission">>, <<"action">>, <<"topic">>, <<"qos">>, <<"retain">>]).
parse_rule_from_row(ColumnNames, Row) ->
RuleRaw = maps:with(?RAW_RULE_KEYS, maps:from_list(lists:zip(ColumnNames, to_list(Row)))),
case emqx_authz_rule_raw:parse_rule(RuleRaw) of
-spec parse_rule_from_row([binary()], [binary()] | map()) ->
{ok, emqx_authz_rule:rule()} | {error, term()}.
parse_rule_from_row(_ColumnNames, RuleMap = #{}) ->
case emqx_authz_rule_raw:parse_rule(RuleMap) of
{ok, {Permission, Action, Topics}} ->
emqx_authz_rule:compile({Permission, all, Action, Topics});
{ok, emqx_authz_rule:compile({Permission, all, Action, Topics})};
{error, Reason} ->
error(Reason)
end.
{error, Reason}
end;
parse_rule_from_row(ColumnNames, Row) ->
RuleMap = maps:with(?RAW_RULE_KEYS, maps:from_list(lists:zip(ColumnNames, to_list(Row)))),
parse_rule_from_row(ColumnNames, RuleMap).
vars_for_rule_query(Client, ?authz_action(PubSub, Qos) = Action) ->
Client#{
@ -157,3 +164,39 @@ to_list(Tuple) when is_tuple(Tuple) ->
tuple_to_list(Tuple);
to_list(List) when is_list(List) ->
List.
do_authorize(Type, Client, Action, Topic, ColumnNames, Row) ->
try
maybe
{ok, Rule} ?= parse_rule_from_row(ColumnNames, Row),
{matched, Permission} ?= emqx_authz_rule:match(Client, Action, Topic, Rule),
{matched, Permission}
else
nomatch ->
nomatch;
{error, Reason0} ->
log_match_rule_error(Type, Row, Reason0),
nomatch
end
catch
throw:Reason1 ->
log_match_rule_error(Type, Row, Reason1),
nomatch
end.
log_match_rule_error(Type, Row, Reason0) ->
Msg0 = #{
msg => "match_rule_error",
rule => Row,
type => Type
},
Msg1 =
case is_map(Reason0) of
true -> maps:merge(Msg0, Reason0);
false -> Msg0#{reason => Reason0}
end,
?SLOG(
error,
Msg1,
#{tag => "AUTHZ"}
).

View File

@ -0,0 +1,3 @@
-----BEGIN PUBLIC KEY-----
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
-----END PUBLIC KEY-----

Some files were not shown because too many files have changed in this diff Show More