From 1d1f595e6f402450d5092d27e58572d30242692f Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Thu, 21 Mar 2024 17:30:45 +0200 Subject: [PATCH 01/29] fix(emqx_mgmt_data_backup): remove an uploaded backup file if it's not valid --- apps/emqx_management/src/emqx_management.app.src | 2 +- apps/emqx_management/src/emqx_mgmt_data_backup.erl | 2 ++ .../test/emqx_mgmt_api_data_backup_SUITE.erl | 6 +++++- changes/ce/fix-12759.en.md | 1 + 4 files changed, 9 insertions(+), 2 deletions(-) create mode 100644 changes/ce/fix-12759.en.md diff --git a/apps/emqx_management/src/emqx_management.app.src b/apps/emqx_management/src/emqx_management.app.src index bd596ffd4..bc2425a55 100644 --- a/apps/emqx_management/src/emqx_management.app.src +++ b/apps/emqx_management/src/emqx_management.app.src @@ -2,7 +2,7 @@ {application, emqx_management, [ {description, "EMQX Management API and CLI"}, % strict semver, bump manually! - {vsn, "5.1.0"}, + {vsn, "5.1.1"}, {modules, []}, {registered, [emqx_management_sup]}, {applications, [ diff --git a/apps/emqx_management/src/emqx_mgmt_data_backup.erl b/apps/emqx_management/src/emqx_mgmt_data_backup.erl index d88a4d998..2aaa014a8 100644 --- a/apps/emqx_management/src/emqx_mgmt_data_backup.erl +++ b/apps/emqx_management/src/emqx_mgmt_data_backup.erl @@ -315,8 +315,10 @@ do_upload(BackupFileNameStr, BackupFileContent) -> catch error:{badmatch, {error, Reason}}:Stack -> ?SLOG(error, #{msg => "emqx_data_upload_failed", reason => Reason, stacktrace => Stack}), + _ = file:delete(FilePath), {error, Reason}; Class:Reason:Stack -> + _ = file:delete(FilePath), ?SLOG(error, #{ msg => "emqx_data_upload_failed", exception => Class, diff --git a/apps/emqx_management/test/emqx_mgmt_api_data_backup_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_data_backup_SUITE.erl index e94de971d..6a580fd57 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_data_backup_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_data_backup_SUITE.erl @@ -199,7 +199,11 @@ upload_backup_test(Config, BackupName) -> ?assertEqual(ok, upload_backup(?NODE3_PORT, Auth, UploadFile)), %% This file was specially forged to pass upload validation bat fail on import ?assertEqual(ok, upload_backup(?NODE2_PORT, Auth, BadImportFile)), - ?assertEqual({error, bad_request}, upload_backup(?NODE1_PORT, Auth, BadUploadFile)). + ?assertEqual({error, bad_request}, upload_backup(?NODE1_PORT, Auth, BadUploadFile)), + %% Invalid file must not be kept + ?assertMatch( + {error, {_, 404, _}}, backup_file_op(get, ?NODE1_PORT, Auth, ?BAD_UPLOAD_BACKUP, []) + ). import_backup_test(Config, BackupName) -> Auth = ?config(auth, Config), diff --git a/changes/ce/fix-12759.en.md b/changes/ce/fix-12759.en.md new file mode 100644 index 000000000..2906bd17e --- /dev/null +++ b/changes/ce/fix-12759.en.md @@ -0,0 +1 @@ +Do not save invalid uploaded backup files. From 906a77d167617509c9509542bf2300f511d628bd Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Fri, 22 Mar 2024 12:12:37 +0200 Subject: [PATCH 02/29] chore: rename `message_queue_too_long` error reason to `mailbox_overflow` `mailbox_overflow` is consistent with the corresponding config parameter: 'force_shutdown.max_mailbox_size' --- apps/emqx_utils/src/emqx_utils.app.src | 2 +- apps/emqx_utils/src/emqx_utils.erl | 2 +- apps/emqx_utils/test/emqx_utils_SUITE.erl | 2 +- changes/ce/fix-12766.en.md | 3 +++ 4 files changed, 6 insertions(+), 3 deletions(-) create mode 100644 changes/ce/fix-12766.en.md diff --git a/apps/emqx_utils/src/emqx_utils.app.src b/apps/emqx_utils/src/emqx_utils.app.src index 9e2f77d71..3dffe8cec 100644 --- a/apps/emqx_utils/src/emqx_utils.app.src +++ b/apps/emqx_utils/src/emqx_utils.app.src @@ -2,7 +2,7 @@ {application, emqx_utils, [ {description, "Miscellaneous utilities for EMQX apps"}, % strict semver, bump manually! - {vsn, "5.1.0"}, + {vsn, "5.1.1"}, {modules, [ emqx_utils, emqx_utils_api, diff --git a/apps/emqx_utils/src/emqx_utils.erl b/apps/emqx_utils/src/emqx_utils.erl index 0be489696..aa6b7d2cf 100644 --- a/apps/emqx_utils/src/emqx_utils.erl +++ b/apps/emqx_utils/src/emqx_utils.erl @@ -261,7 +261,7 @@ check_oom(Pid, #{ ok; [{message_queue_len, QLen}, {total_heap_size, HeapSize}] -> do_check_oom([ - {QLen, MaxQLen, message_queue_too_long}, + {QLen, MaxQLen, mailbox_overflow}, {HeapSize, MaxHeapSize, proc_heap_too_large} ]) end. diff --git a/apps/emqx_utils/test/emqx_utils_SUITE.erl b/apps/emqx_utils/test/emqx_utils_SUITE.erl index c3a8cc17f..acb2623de 100644 --- a/apps/emqx_utils/test/emqx_utils_SUITE.erl +++ b/apps/emqx_utils/test/emqx_utils_SUITE.erl @@ -150,7 +150,7 @@ t_check(_) -> ?assertEqual(ok, emqx_utils:check_oom(Policy)), [self() ! {msg, I} || I <- lists:seq(1, 6)], ?assertEqual( - {shutdown, #{reason => message_queue_too_long, value => 11, max => 10}}, + {shutdown, #{reason => mailbox_overflow, value => 11, max => 10}}, emqx_utils:check_oom(Policy) ). diff --git a/changes/ce/fix-12766.en.md b/changes/ce/fix-12766.en.md new file mode 100644 index 000000000..51ace3faf --- /dev/null +++ b/changes/ce/fix-12766.en.md @@ -0,0 +1,3 @@ +Rename `message_queue_too_long` error reason to `mailbox_overflow` + +`mailbox_overflow` is consistent with the corresponding config parameter: `force_shutdown.max_mailbox_size`. From 04bf763890bc0591fc340a13ef01ba1cda31ce12 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 21 Mar 2024 11:49:00 -0300 Subject: [PATCH 03/29] fix(kafka-based bridges): avoid trying to get raw config for replayq dir Fixes https://emqx.atlassian.net/browse/EMQX-12049 --- apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src | 2 +- .../src/emqx_bridge_kafka_impl_producer.erl | 11 +++++++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src index 74ba58217..b3025113a 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge_kafka, [ {description, "EMQX Enterprise Kafka Bridge"}, - {vsn, "0.2.2"}, + {vsn, "0.2.3"}, {registered, [emqx_bridge_kafka_consumer_sup]}, {applications, [ kernel, diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 20241fdcd..6bb1690ff 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -668,9 +668,8 @@ partitioner(random) -> random; partitioner(key_dispatch) -> first_key_dispatch. replayq_dir(BridgeType, BridgeName) -> - RawConf = emqx_conf:get_raw([actions, BridgeType, BridgeName]), DirName = iolist_to_binary([ - emqx_bridge_lib:downgrade_type(BridgeType, RawConf), + maybe_v1_type_name(BridgeType), ":", BridgeName, ":", @@ -678,6 +677,14 @@ replayq_dir(BridgeType, BridgeName) -> ]), filename:join([emqx:data_dir(), "kafka", DirName]). +%% To avoid losing queued data on disk, we must use the same directory as the old v1 +%% bridges, if any. Among the Kafka-based bridges that exist since v1, only Kafka changed +%% its type name. Other bridges are either unchanged, or v2-only, and should use their v2 +%% type names. +maybe_v1_type_name(Type) when is_atom(Type) -> maybe_v1_type_name(atom_to_binary(Type)); +maybe_v1_type_name(<<"kafka_producer">>) -> <<"kafka">>; +maybe_v1_type_name(Type) -> Type. + with_log_at_error(Fun, Log) -> try Fun() From 7982dd017b67eb9000206cedc0c40bc40c38af23 Mon Sep 17 00:00:00 2001 From: zmstone Date: Sat, 23 Mar 2024 10:29:05 +0100 Subject: [PATCH 04/29] chore: upgrade http client libs gun-1.3.11 and ehttpc-0.4.13 --- changes/ce/fix-12773.en.md | 8 ++++++++ mix.exs | 4 ++-- rebar.config | 4 ++-- 3 files changed, 12 insertions(+), 4 deletions(-) create mode 100644 changes/ce/fix-12773.en.md diff --git a/changes/ce/fix-12773.en.md b/changes/ce/fix-12773.en.md new file mode 100644 index 000000000..c4a1f80aa --- /dev/null +++ b/changes/ce/fix-12773.en.md @@ -0,0 +1,8 @@ +Upgrade HTTP client libraries. + +The HTTP client library (`gun-1.3`) incorrectly appends a `:portnumber` suffix to the `Host` header for +standard ports (`http` on port 80, `https` on port 443). This could cause compatibility issues with servers or +gateways performing strict `Host` header checks (e.g., AWS Lambda, Alibaba Cloud HTTP gateways), leading to +errors such as `InvalidCustomDomain.NotFound` or "The specified CustomDomain does not exist." + + diff --git a/mix.exs b/mix.exs index 09f5f0692..74c33f6b3 100644 --- a/mix.exs +++ b/mix.exs @@ -49,7 +49,7 @@ defmodule EMQXUmbrella.MixProject do {:redbug, github: "emqx/redbug", tag: "2.0.10"}, {:covertool, github: "zmstone/covertool", tag: "2.0.4.1", override: true}, {:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true}, - {:ehttpc, github: "emqx/ehttpc", tag: "0.4.12", override: true}, + {:ehttpc, github: "emqx/ehttpc", tag: "0.4.13", override: true}, {:gproc, github: "emqx/gproc", tag: "0.9.0.1", override: true}, {:jiffy, github: "emqx/jiffy", tag: "1.0.6", override: true}, {:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true}, @@ -77,7 +77,7 @@ defmodule EMQXUmbrella.MixProject do {:esasl, github: "emqx/esasl", tag: "0.2.0"}, {:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"}, # in conflict by ehttpc and emqtt - {:gun, github: "emqx/gun", tag: "1.3.10", override: true}, + {:gun, github: "emqx/gun", tag: "1.3.11", override: true}, # in conflict by emqx_connector and system_monitor {:epgsql, github: "emqx/epgsql", tag: "4.7.1.1", override: true}, # in conflict by emqx and observer_cli diff --git a/rebar.config b/rebar.config index 238dca515..b4d52e867 100644 --- a/rebar.config +++ b/rebar.config @@ -76,8 +76,8 @@ {covertool, {git, "https://github.com/zmstone/covertool", {tag, "2.0.4.1"}}}, {gpb, "4.19.9"}, {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}}, - {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.10"}}}, - {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.4.12"}}}, + {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.11"}}}, + {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.4.13"}}}, {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}}, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.6"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}, From 5367893427e5f431aefa882cde1aafe27b867ad4 Mon Sep 17 00:00:00 2001 From: Ivan Dyachkov Date: Thu, 28 Mar 2024 16:41:50 +0100 Subject: [PATCH 05/29] ci(build_packages): restore building tgz --- .github/workflows/build_packages.yaml | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/.github/workflows/build_packages.yaml b/.github/workflows/build_packages.yaml index 4ea381acc..9be54e394 100644 --- a/.github/workflows/build_packages.yaml +++ b/.github/workflows/build_packages.yaml @@ -151,7 +151,23 @@ jobs: with: ref: ${{ github.event.inputs.ref }} fetch-depth: 0 - - name: build emqx packages + - name: build tgz + env: + PROFILE: ${{ matrix.profile }} + ARCH: ${{ matrix.arch }} + OS: ${{ matrix.os }} + IS_ELIXIR: ${{ matrix.with_elixir }} + BUILDER: "ghcr.io/emqx/emqx-builder/${{ matrix.builder }}:${{ matrix.elixir }}-${{ matrix.otp }}-${{ matrix.os }}" + BUILDER_SYSTEM: force_docker + run: | + ./scripts/buildx.sh \ + --profile $PROFILE \ + --arch $ARCH \ + --builder $BUILDER \ + --elixir $IS_ELIXIR \ + --pkgtype tgz + - name: build pkg + if: matrix.with_elixir == 'no' env: PROFILE: ${{ matrix.profile }} ARCH: ${{ matrix.arch }} From 3eda182e9a320c9c3afc726359f44b8f5a0c4d6b Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Wed, 27 Mar 2024 20:04:16 +0200 Subject: [PATCH 06/29] fix: prevent a node from discovering and re-joining the same cluster after it has (manually) left it. --- apps/emqx/rebar.config | 2 +- .../src/emqx_management.app.src | 2 +- apps/emqx_management/src/emqx_mgmt_cli.erl | 25 +++++- .../test/emqx_mgmt_cli_SUITE.erl | 83 +++++++++++++++++++ changes/ce/fix-12802.en.md | 3 + mix.exs | 2 +- rebar.config | 2 +- 7 files changed, 114 insertions(+), 5 deletions(-) create mode 100644 changes/ce/fix-12802.en.md diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 60ac6343f..70cf636e7 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -28,7 +28,7 @@ {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.1"}}}, - {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.1"}}}, + {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.2"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}}, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.42.1"}}}, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}}, diff --git a/apps/emqx_management/src/emqx_management.app.src b/apps/emqx_management/src/emqx_management.app.src index bd596ffd4..bc2425a55 100644 --- a/apps/emqx_management/src/emqx_management.app.src +++ b/apps/emqx_management/src/emqx_management.app.src @@ -2,7 +2,7 @@ {application, emqx_management, [ {description, "EMQX Management API and CLI"}, % strict semver, bump manually! - {vsn, "5.1.0"}, + {vsn, "5.1.1"}, {modules, []}, {registered, [emqx_management_sup]}, {applications, [ diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index ddbc60d5c..2af3a8397 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -108,6 +108,7 @@ cluster(["join", SNode]) -> emqx_ctl:print("Failed to join the cluster: ~0p~n", [Error]) end; cluster(["leave"]) -> + _ = maybe_disable_autocluster(), case mria:leave() of ok -> emqx_ctl:print("Leave the cluster successfully.~n"), @@ -139,12 +140,15 @@ cluster(["status"]) -> cluster(["status", "--json"]) -> Info = sort_map_list_fields(cluster_info()), emqx_ctl:print("~ts~n", [emqx_logger_jsonfmt:best_effort_json(Info)]); +cluster(["discovery", "enable"]) -> + enable_autocluster(); cluster(_) -> emqx_ctl:usage([ {"cluster join ", "Join the cluster"}, {"cluster leave", "Leave the cluster"}, {"cluster force-leave ", "Force the node leave from cluster"}, - {"cluster status [--json]", "Cluster status"} + {"cluster status [--json]", "Cluster status"}, + {"cluster discovery enable", "Enable and run automatic cluster discovery (if configured)"} ]). %% sort lists for deterministic output @@ -163,6 +167,25 @@ sort_map_list_field(Field, Map) -> _ -> Map end. +enable_autocluster() -> + ok = ekka:enable_autocluster(), + _ = ekka:autocluster(emqx), + emqx_ctl:print("Automatic cluster discovery enabled.~n"). + +maybe_disable_autocluster() -> + case ekka:autocluster_enabled() of + true -> + ok = ekka:disable_autocluster(), + emqx_ctl:print( + "Automatic cluster discovery is disabled on this node: ~p to avoid" + " re-joining the same cluster again, if the node is not stopped soon." + " To enable it run: 'emqx ctl cluster discovery enable' or restart the node.~n", + [node()] + ); + false -> + ok + end. + %%-------------------------------------------------------------------- %% @doc Query clients diff --git a/apps/emqx_management/test/emqx_mgmt_cli_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_cli_SUITE.erl index c81881c95..c6f00bff0 100644 --- a/apps/emqx_management/test/emqx_mgmt_cli_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_cli_SUITE.erl @@ -19,6 +19,7 @@ -compile(nowarn_export_all). -include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). all() -> emqx_common_test_helpers:all(?MODULE). @@ -31,6 +32,47 @@ init_per_suite(Config) -> end_per_suite(_) -> emqx_mgmt_api_test_util:end_suite([emqx_management, emqx_conf]). +init_per_testcase(t_autocluster_leave = TC, Config) -> + [Core1, Core2, Core3, Repl] = + Nodes = [ + t_autocluster_leave_core1, + t_autocluster_leave_core2, + t_autocluster_leave_core3, + t_autocluster_leave_replicant + ], + + NodeNames = [emqx_cth_cluster:node_name(N) || N <- Nodes], + AppSpec = [ + emqx, + {emqx_conf, #{ + config => #{ + cluster => #{ + discovery_strategy => static, + static => #{seeds => NodeNames} + } + } + }}, + emqx_management + ], + Cluster = emqx_cth_cluster:start( + [ + {Core1, #{role => core, apps => AppSpec}}, + {Core2, #{role => core, apps => AppSpec}}, + {Core3, #{role => core, apps => AppSpec}}, + {Repl, #{role => replicant, apps => AppSpec}} + ], + #{work_dir => emqx_cth_suite:work_dir(TC, Config)} + ), + [{cluster, Cluster} | Config]; +init_per_testcase(_TC, Config) -> + Config. + +end_per_testcase(_TC, Config) -> + case ?config(cluster, Config) of + undefined -> ok; + Cluster -> emqx_cth_cluster:stop(Cluster) + end. + t_status(_Config) -> emqx_ctl:run_command([]), emqx_ctl:run_command(["status"]), @@ -263,3 +305,44 @@ t_admin(_Config) -> %% admins passwd # Reset dashboard user password %% admins del # Delete dashboard user ok. + +t_autocluster_leave(Config) -> + [Core1, Core2, Core3, Repl] = Cluster = ?config(cluster, Config), + %% Mria membership updates are async, makes sense to wait a little + timer:sleep(300), + ClusterView = [lists:sort(rpc:call(N, emqx, running_nodes, [])) || N <- Cluster], + [View1, View2, View3, View4] = ClusterView, + ?assertEqual(lists:sort(Cluster), View1), + ?assertEqual(View1, View2), + ?assertEqual(View1, View3), + ?assertEqual(View1, View4), + + rpc:call(Core3, emqx_mgmt_cli, cluster, [["leave"]]), + timer:sleep(1000), + %% Replicant node may still discover and join Core3 which is now split from [Core1, Core2], + %% but it's expected to choose a bigger cluster of [Core1, Core2].. + ?assertMatch([Core3], rpc:call(Core3, emqx, running_nodes, [])), + ?assertEqual(undefined, rpc:call(Core1, erlang, whereis, [ekka_autocluster])), + ?assertEqual(lists:sort([Core1, Core2, Repl]), rpc:call(Core1, emqx, running_nodes, [])), + ?assertEqual(lists:sort([Core1, Core2, Repl]), rpc:call(Core2, emqx, running_nodes, [])), + ?assertEqual(lists:sort([Core1, Core2, Repl]), rpc:call(Repl, emqx, running_nodes, [])), + + rpc:call(Repl, emqx_mgmt_cli, cluster, [["leave"]]), + timer:sleep(1000), + ?assertEqual(lists:sort([Core1, Core2]), rpc:call(Core1, emqx, running_nodes, [])), + ?assertEqual(lists:sort([Core1, Core2]), rpc:call(Core2, emqx, running_nodes, [])), + + rpc:call(Core3, emqx_mgmt_cli, cluster, [["discovery", "enable"]]), + rpc:call(Repl, emqx_mgmt_cli, cluster, [["discovery", "enable"]]), + %% core nodes will join and restart asyncly, may need more time to re-cluster + ?assertEqual( + ok, + emqx_common_test_helpers:wait_for( + ?FUNCTION_NAME, + ?LINE, + fun() -> + [lists:sort(rpc:call(N, emqx, running_nodes, [])) || N <- Cluster] =:= ClusterView + end, + 10_000 + ) + ). diff --git a/changes/ce/fix-12802.en.md b/changes/ce/fix-12802.en.md new file mode 100644 index 000000000..f63603a97 --- /dev/null +++ b/changes/ce/fix-12802.en.md @@ -0,0 +1,3 @@ +Improve cluster discovery behaviour when a node is manually removed from a cluster using 'emqx ctl cluster leave' command. +Previously, if the configured cluster 'discovery_strategy' was not 'manual', the left node might re-discover and re-join the same cluster shortly after it left (unless it was stopped). +After this change, 'cluster leave' command disables automatic cluster_discovery, so that the left node won't re-join the same cluster again. Cluster discovery can be re-enabled by running 'emqx ctl discovery enable` or by restarting the left node. diff --git a/mix.exs b/mix.exs index 638bbb284..20df56e90 100644 --- a/mix.exs +++ b/mix.exs @@ -55,7 +55,7 @@ defmodule EMQXUmbrella.MixProject do {:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true}, {:esockd, github: "emqx/esockd", tag: "5.11.1", override: true}, {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-2", override: true}, - {:ekka, github: "emqx/ekka", tag: "0.19.1", override: true}, + {:ekka, github: "emqx/ekka", tag: "0.19.2", override: true}, {:gen_rpc, github: "emqx/gen_rpc", tag: "3.3.1", override: true}, {:grpc, github: "emqx/grpc-erl", tag: "0.6.12", override: true}, {:minirest, github: "emqx/minirest", tag: "1.4.0", override: true}, diff --git a/rebar.config b/rebar.config index 238dca515..1dd17c4e7 100644 --- a/rebar.config +++ b/rebar.config @@ -83,7 +83,7 @@ {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.1"}}}, {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-2"}}}, - {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.1"}}}, + {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.2"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}}, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.12"}}}, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.4.0"}}}, From 6e0be5ad35b915f03db48c2cbc0c7e19e03e9801 Mon Sep 17 00:00:00 2001 From: Ivan Dyachkov Date: Thu, 28 Mar 2024 18:19:47 +0100 Subject: [PATCH 07/29] ci(release): bump emqx/upload-assets to 0.5.2 this version adds an option to skip uploading existing assets --- .github/workflows/release.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 1bed80376..0380b630b 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -67,12 +67,13 @@ jobs: BUCKET=${{ secrets.AWS_S3_BUCKET }} OUTPUT_DIR=${{ steps.profile.outputs.s3dir }} aws s3 cp --recursive s3://$BUCKET/$OUTPUT_DIR/${{ env.ref_name }} packages - - uses: emqx/upload-assets@8d2083b4dbe3151b0b735572eaa153b6acb647fe # 0.5.0 + - uses: emqx/upload-assets@974befcf0e72a1811360a81c798855efb66b0551 # 0.5.2 env: GITHUB_TOKEN: ${{ github.token }} with: asset_paths: '["packages/*"]' tag_name: "${{ env.ref_name }}" + skip_existing: true - name: update to emqx.io if: startsWith(env.ref_name, 'v') && ((github.event_name == 'release' && !github.event.release.prerelease) || inputs.publish_release_artefacts) run: | From f5a820cb102c9d9ae60a3d340960a053f0522cca Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Fri, 29 Mar 2024 13:09:08 +0200 Subject: [PATCH 08/29] fix(emqx_mgmt): catch OOM shutdown exits properly when calling a client conn process The exit reason is expected to include gen_server `Location`: `{{shutdown, OOMInfo}, Location}`. --- apps/emqx_management/src/emqx_mgmt.erl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index 35908d3bd..df0450395 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -711,5 +711,7 @@ call_conn(ConnMod, Pid, Req) -> exit:R when R =:= shutdown; R =:= normal -> {error, shutdown}; exit:{R, _} when R =:= shutdown; R =:= noproc -> + {error, shutdown}; + exit:{{shutdown, _OOMInfo}, _Location} -> {error, shutdown} end. From 42af1f9d634b1408f537c233a7d1fe9801e7950b Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Fri, 29 Mar 2024 13:29:19 +0200 Subject: [PATCH 09/29] fix: handle internal timeout errors in client Mqueue/Inflight APIs --- apps/emqx_management/src/emqx_mgmt.erl | 14 +++++++++++++- apps/emqx_management/src/emqx_mgmt_api_clients.erl | 2 ++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index df0450395..bc194f03e 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -713,5 +713,17 @@ call_conn(ConnMod, Pid, Req) -> exit:{R, _} when R =:= shutdown; R =:= noproc -> {error, shutdown}; exit:{{shutdown, _OOMInfo}, _Location} -> - {error, shutdown} + {error, shutdown}; + exit:timeout -> + ?SLOG( + warning, + #{ + msg => "call_client_connection_process_timeout", + request => Req, + pid => Pid, + module => ConnMod, + stacktrace => erlang:process_info(Pid, current_stacktrace) + } + ), + {error, timeout} end. diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index dd65c1245..262faf87f 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -1232,6 +1232,8 @@ list_client_msgs(MsgType, ClientID, QString) -> code => 'NOT_IMPLEMENTED', message => <<"API not implemented for persistent sessions">> }}; + {error, Reason} -> + ?INTERNAL_ERROR(Reason); {Msgs, Meta = #{}} when is_list(Msgs) -> format_msgs_resp(MsgType, Msgs, Meta, QString) end From 6cdf876684932d81f8bd063c85a175b5d9cc455c Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Fri, 29 Mar 2024 13:39:36 +0200 Subject: [PATCH 10/29] chore: add changelog --- changes/ce/fix-12814.en.md | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 changes/ce/fix-12814.en.md diff --git a/changes/ce/fix-12814.en.md b/changes/ce/fix-12814.en.md new file mode 100644 index 000000000..f84025561 --- /dev/null +++ b/changes/ce/fix-12814.en.md @@ -0,0 +1,4 @@ +Handle several errors in `/clients/{clientid}/mqueue_messages` and `/clients/{clientid}/inflight_messages` APIs: + +- Internal timeout, which means that EMQX failed to get the list of Inflight/Mqueue messages within the default timeout of 5 s. This error may occur when the system is under a heavy load. The API will return 500 `{"code":"INTERNAL_ERROR","message":"timeout"}` response and log additional details. +- Client shutdown. The error may occur if the client connection is shutdown during the API call. The API will return 404 `{"code": "CLIENT_SHUTDOWN", "message": "Client connection has been shutdown"}` response in this case. From ceb04ba06da74061f10fce697027884549a3925b Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Mon, 1 Apr 2024 16:42:12 +0300 Subject: [PATCH 11/29] fix(emqx_mgmt): do not attempt to get a stacktrace of a remote client connection process --- apps/emqx_management/src/emqx_mgmt.erl | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index bc194f03e..0cde965f9 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -715,15 +715,20 @@ call_conn(ConnMod, Pid, Req) -> exit:{{shutdown, _OOMInfo}, _Location} -> {error, shutdown}; exit:timeout -> - ?SLOG( - warning, - #{ - msg => "call_client_connection_process_timeout", - request => Req, - pid => Pid, - module => ConnMod, - stacktrace => erlang:process_info(Pid, current_stacktrace) - } - ), + LogData = #{ + msg => "call_client_connection_process_timeout", + request => Req, + pid => Pid, + module => ConnMod + }, + LogData1 = + case node(Pid) =:= node() of + true -> + LogData#{stacktrace => erlang:process_info(Pid, current_stacktrace)}; + false -> + LogData + end, + + ?SLOG(warning, LogData1), {error, timeout} end. From 1a4cfc2a2d9b504df8734c4317a23b276edb1115 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 21 Mar 2024 17:30:36 +0800 Subject: [PATCH 12/29] fix(api_schema): removed metrics schema in api spec - Followup [PR#6622](https://github.com/emqx/emqx/pull/6622). --- apps/emqx_management/src/emqx_mgmt_api_metrics.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_api_metrics.erl b/apps/emqx_management/src/emqx_mgmt_api_metrics.erl index 8d61ee1fb..f2e302569 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_metrics.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_metrics.erl @@ -264,10 +264,10 @@ properties() -> "messages.qos0.received\fmessages.qos1.received and messages.qos2.received" >> ), - m( - 'messages.retained', - <<"Number of retained messages">> - ), + %% m( + %% 'messages.retained', + %% <<"Number of retained messages">> + %% ), m( 'messages.sent', << From 0f4b14829416b51b236d850a9eb8525168f32ac5 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 22 Mar 2024 18:03:13 +0800 Subject: [PATCH 13/29] refactor: uniform shared_sub table macros --- apps/emqx/include/emqx_shared_sub.hrl | 28 ++++++++++++ apps/emqx/src/emqx_shared_sub.erl | 66 +++++++++++++-------------- 2 files changed, 60 insertions(+), 34 deletions(-) create mode 100644 apps/emqx/include/emqx_shared_sub.hrl diff --git a/apps/emqx/include/emqx_shared_sub.hrl b/apps/emqx/include/emqx_shared_sub.hrl new file mode 100644 index 000000000..d744bd8a8 --- /dev/null +++ b/apps/emqx/include/emqx_shared_sub.hrl @@ -0,0 +1,28 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2018-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-ifndef(EMQX_SHARED_SUB_HRL). +-define(EMQX_SHARED_SUB_HRL, true). + +%% Mnesia table for shared sub message routing +-define(SHARED_SUBSCRIPTION, emqx_shared_subscription). + +%% ETS tables for Shared PubSub +-define(SHARED_SUBSCRIBER, emqx_shared_subscriber). +-define(ALIVE_SHARED_SUBSCRIBERS, emqx_alive_shared_subscribers). +-define(SHARED_SUBS_ROUND_ROBIN_COUNTER, emqx_shared_subscriber_round_robin_counter). + +-endif. diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index ce694ba33..54c107111 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -21,6 +21,7 @@ -include("emqx_schema.hrl"). -include("emqx.hrl"). -include("emqx_mqtt.hrl"). +-include("emqx_shared_sub.hrl"). -include("logger.hrl"). -include("types.hrl"). @@ -84,10 +85,7 @@ | hash_topic. -define(SERVER, ?MODULE). --define(TAB, emqx_shared_subscription). --define(SHARED_SUBS_ROUND_ROBIN_COUNTER, emqx_shared_subscriber_round_robin_counter). --define(SHARED_SUBS, emqx_shared_subscriber). --define(ALIVE_SUBS, emqx_alive_shared_subscribers). + -define(SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS, 5). -define(IS_LOCAL_PID(Pid), (is_pid(Pid) andalso node(Pid) =:= node())). -define(ACK, shared_sub_ack). @@ -99,21 +97,21 @@ -record(state, {pmon}). --record(emqx_shared_subscription, {group, topic, subpid}). +-record(?SHARED_SUBSCRIPTION, {group, topic, subpid}). %%-------------------------------------------------------------------- %% Mnesia bootstrap %%-------------------------------------------------------------------- create_tables() -> - ok = mria:create_table(?TAB, [ + ok = mria:create_table(?SHARED_SUBSCRIPTION, [ {type, bag}, {rlog_shard, ?SHARED_SUB_SHARD}, {storage, ram_copies}, - {record_name, emqx_shared_subscription}, - {attributes, record_info(fields, emqx_shared_subscription)} + {record_name, ?SHARED_SUBSCRIPTION}, + {attributes, record_info(fields, ?SHARED_SUBSCRIPTION)} ]), - [?TAB]. + [?SHARED_SUBSCRIPTION]. %%-------------------------------------------------------------------- %% API @@ -132,7 +130,7 @@ unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) -> gen_server:call(?SERVER, {unsubscribe, Group, Topic, SubPid}). record(Group, Topic, SubPid) -> - #emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}. + #?SHARED_SUBSCRIPTION{group = Group, topic = Topic, subpid = SubPid}. -spec dispatch(emqx_types:group(), emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result(). @@ -394,18 +392,18 @@ subscribers(Group, Topic, FailedSubs) -> %% Select ETS table to get all subscriber pids. subscribers(Group, Topic) -> - ets:select(?TAB, [{{emqx_shared_subscription, Group, Topic, '$1'}, [], ['$1']}]). + ets:select(?SHARED_SUBSCRIPTION, [{{emqx_shared_subscription, Group, Topic, '$1'}, [], ['$1']}]). %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- init([]) -> - ok = mria:wait_for_tables([?TAB]), - {ok, _} = mnesia:subscribe({table, ?TAB, simple}), + ok = mria:wait_for_tables([?SHARED_SUBSCRIPTION]), + {ok, _} = mnesia:subscribe({table, ?SHARED_SUBSCRIPTION, simple}), {atomic, PMon} = mria:transaction(?SHARED_SUB_SHARD, fun ?MODULE:init_monitors/0), - ok = emqx_utils_ets:new(?SHARED_SUBS, [protected, bag]), - ok = emqx_utils_ets:new(?ALIVE_SUBS, [protected, set, {read_concurrency, true}]), + ok = emqx_utils_ets:new(?SHARED_SUBSCRIBER, [protected, bag]), + ok = emqx_utils_ets:new(?ALIVE_SHARED_SUBSCRIBERS, [protected, set, {read_concurrency, true}]), ok = emqx_utils_ets:new(?SHARED_SUBS_ROUND_ROBIN_COUNTER, [ public, set, {write_concurrency, true} ]), @@ -413,26 +411,26 @@ init([]) -> init_monitors() -> mnesia:foldl( - fun(#emqx_shared_subscription{subpid = SubPid}, Mon) -> + fun(#?SHARED_SUBSCRIPTION{subpid = SubPid}, Mon) -> emqx_pmon:monitor(SubPid, Mon) end, emqx_pmon:new(), - ?TAB + ?SHARED_SUBSCRIPTION ). handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon}) -> - mria:dirty_write(?TAB, record(Group, Topic, SubPid)), - case ets:member(?SHARED_SUBS, {Group, Topic}) of + mria:dirty_write(?SHARED_SUBSCRIPTION, record(Group, Topic, SubPid)), + case ets:member(?SHARED_SUBSCRIBER, {Group, Topic}) of true -> ok; false -> ok = emqx_router:do_add_route(Topic, {Group, node()}) end, ok = maybe_insert_alive_tab(SubPid), ok = maybe_insert_round_robin_count({Group, Topic}), - true = ets:insert(?SHARED_SUBS, {{Group, Topic}, SubPid}), + true = ets:insert(?SHARED_SUBSCRIBER, {{Group, Topic}, SubPid}), {reply, ok, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})}; handle_call({unsubscribe, Group, Topic, SubPid}, _From, State) -> - mria:dirty_delete_object(?TAB, record(Group, Topic, SubPid)), - true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}), + mria:dirty_delete_object(?SHARED_SUBSCRIPTION, record(Group, Topic, SubPid)), + true = ets:delete_object(?SHARED_SUBSCRIBER, {{Group, Topic}, SubPid}), delete_route_if_needed({Group, Topic}), maybe_delete_round_robin_count({Group, Topic}), {reply, ok, update_stats(State)}; @@ -445,7 +443,7 @@ handle_cast(Msg, State) -> {noreply, State}. handle_info( - {mnesia_table_event, {write, #emqx_shared_subscription{subpid = SubPid}, _}}, + {mnesia_table_event, {write, #?SHARED_SUBSCRIPTION{subpid = SubPid}, _}}, State = #state{pmon = PMon} ) -> ok = maybe_insert_alive_tab(SubPid), @@ -455,7 +453,7 @@ handle_info( %% The trick is we don't demonitor the subscriber here, and (after a long time) it will eventually %% be disconnected. % handle_info({mnesia_table_event, {delete_object, OldRecord, _}}, State = #state{pmon = PMon}) -> -% #emqx_shared_subscription{subpid = SubPid} = OldRecord, +% #?SHARED_SUBSCRIPTION{subpid = SubPid} = OldRecord, % {noreply, update_stats(State#state{pmon = emqx_pmon:demonitor(SubPid, PMon)})}; handle_info({mnesia_table_event, _Event}, State) -> @@ -468,7 +466,7 @@ handle_info(_Info, State) -> {noreply, State}. terminate(_Reason, _State) -> - mnesia:unsubscribe({table, ?TAB, simple}). + mnesia:unsubscribe({table, ?SHARED_SUBSCRIPTION, simple}). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -501,7 +499,7 @@ maybe_delete_round_robin_count({Group, _Topic} = GroupTopic) -> ok. if_no_more_subscribers(GroupTopic, Fn) -> - case ets:member(?SHARED_SUBS, GroupTopic) of + case ets:member(?SHARED_SUBSCRIBER, GroupTopic) of true -> ok; false -> Fn() end, @@ -510,26 +508,26 @@ if_no_more_subscribers(GroupTopic, Fn) -> %% keep track of alive remote pids maybe_insert_alive_tab(Pid) when ?IS_LOCAL_PID(Pid) -> ok; maybe_insert_alive_tab(Pid) when is_pid(Pid) -> - ets:insert(?ALIVE_SUBS, {Pid}), + ets:insert(?ALIVE_SHARED_SUBSCRIBERS, {Pid}), ok. cleanup_down(SubPid) -> - ?IS_LOCAL_PID(SubPid) orelse ets:delete(?ALIVE_SUBS, SubPid), + ?IS_LOCAL_PID(SubPid) orelse ets:delete(?ALIVE_SHARED_SUBSCRIBERS, SubPid), lists:foreach( - fun(Record = #emqx_shared_subscription{topic = Topic, group = Group}) -> - ok = mria:dirty_delete_object(?TAB, Record), - true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}), + fun(Record = #?SHARED_SUBSCRIPTION{topic = Topic, group = Group}) -> + ok = mria:dirty_delete_object(?SHARED_SUBSCRIPTION, Record), + true = ets:delete_object(?SHARED_SUBSCRIBER, {{Group, Topic}, SubPid}), maybe_delete_round_robin_count({Group, Topic}), delete_route_if_needed({Group, Topic}) end, - mnesia:dirty_match_object(#emqx_shared_subscription{_ = '_', subpid = SubPid}) + mnesia:dirty_match_object(#?SHARED_SUBSCRIPTION{_ = '_', subpid = SubPid}) ). update_stats(State) -> emqx_stats:setstat( 'subscriptions.shared.count', 'subscriptions.shared.max', - ets:info(?TAB, size) + ets:info(?SHARED_SUBSCRIPTION, size) ), State. @@ -543,7 +541,7 @@ is_active_sub(Pid, FailedSubs, All) -> is_alive_sub(Pid) when ?IS_LOCAL_PID(Pid) -> erlang:is_process_alive(Pid); is_alive_sub(Pid) -> - [] =/= ets:lookup(?ALIVE_SUBS, Pid). + [] =/= ets:lookup(?ALIVE_SHARED_SUBSCRIBERS, Pid). delete_route_if_needed({Group, Topic} = GroupTopic) -> if_no_more_subscribers(GroupTopic, fun() -> From 50bceee9ab3a244c49b7e78403e346872b9107b0 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 22 Mar 2024 18:31:57 +0800 Subject: [PATCH 14/29] fix(stats): `'subscribers.count'` contains shared-subscriber --- apps/emqx/src/emqx_broker.erl | 18 ---------------- apps/emqx/src/emqx_broker_helper.erl | 31 +++++++++++++++++++++++++++- apps/emqx/test/emqx_broker_SUITE.erl | 2 +- apps/emqx/test/emqx_stats_SUITE.erl | 6 +++--- changes/ce/fix-12765.en.md | 2 ++ 5 files changed, 36 insertions(+), 23 deletions(-) create mode 100644 changes/ce/fix-12765.en.md diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index b20c3a15f..8c1239892 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -60,9 +60,6 @@ -export([topics/0]). -%% Stats fun --export([stats_fun/0]). - %% gen_server callbacks -export([ init/1, @@ -469,21 +466,6 @@ set_subopts(SubPid, Topic, NewOpts) -> topics() -> emqx_router:topics(). -%%-------------------------------------------------------------------- -%% Stats fun -%%-------------------------------------------------------------------- - -stats_fun() -> - safe_update_stats(?SUBSCRIBER, 'subscribers.count', 'subscribers.max'), - safe_update_stats(?SUBSCRIPTION, 'subscriptions.count', 'subscriptions.max'), - safe_update_stats(?SUBOPTION, 'suboptions.count', 'suboptions.max'). - -safe_update_stats(Tab, Stat, MaxStat) -> - case ets:info(Tab, size) of - undefined -> ok; - Size -> emqx_stats:setstat(Stat, MaxStat, Size) - end. - %%-------------------------------------------------------------------- %% call, cast, pick %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_broker_helper.erl b/apps/emqx/src/emqx_broker_helper.erl index 8562a1968..368398b92 100644 --- a/apps/emqx/src/emqx_broker_helper.erl +++ b/apps/emqx/src/emqx_broker_helper.erl @@ -18,6 +18,8 @@ -behaviour(gen_server). +-include("emqx_router.hrl"). +-include("emqx_shared_sub.hrl"). -include("logger.hrl"). -include("types.hrl"). @@ -33,6 +35,9 @@ reclaim_seq/1 ]). +%% Stats fun +-export([stats_fun/0]). + %% gen_server callbacks -export([ init/1, @@ -99,6 +104,30 @@ create_seq(Topic) -> reclaim_seq(Topic) -> emqx_sequence:reclaim(?SUBSEQ, Topic). +%%-------------------------------------------------------------------- +%% Stats fun +%%-------------------------------------------------------------------- + +stats_fun() -> + safe_update_stats(subscriber_val(), 'subscribers.count', 'subscribers.max'), + safe_update_stats(table_size(?SUBSCRIPTION), 'subscriptions.count', 'subscriptions.max'), + safe_update_stats(table_size(?SUBOPTION), 'suboptions.count', 'suboptions.max'). + +safe_update_stats(undefined, _Stat, _MaxStat) -> + ok; +safe_update_stats(Val, Stat, MaxStat) when is_integer(Val) -> + emqx_stats:setstat(Stat, MaxStat, Val). + +subscriber_val() -> + sum_subscriber(table_size(?SUBSCRIBER), table_size(?SHARED_SUBSCRIBER)). + +sum_subscriber(undefined, undefined) -> undefined; +sum_subscriber(undefined, V2) when is_integer(V2) -> V2; +sum_subscriber(V1, undefined) when is_integer(V1) -> V1; +sum_subscriber(V1, V2) when is_integer(V1), is_integer(V2) -> V1 + V2. + +table_size(Tab) when is_atom(Tab) -> ets:info(Tab, size). + %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- @@ -115,7 +144,7 @@ init([]) -> %% SubMon: SubPid -> SubId ok = emqx_utils_ets:new(?SUBMON, [public, {read_concurrency, true}, {write_concurrency, true}]), %% Stats timer - ok = emqx_stats:update_interval(broker_stats, fun emqx_broker:stats_fun/0), + ok = emqx_stats:update_interval(broker_stats, fun ?MODULE:stats_fun/0), {ok, #{pmon => emqx_pmon:new()}}. handle_call(Req, _From, State) -> diff --git a/apps/emqx/test/emqx_broker_SUITE.erl b/apps/emqx/test/emqx_broker_SUITE.erl index d4bb9e7fc..e106e3375 100644 --- a/apps/emqx/test/emqx_broker_SUITE.erl +++ b/apps/emqx/test/emqx_broker_SUITE.erl @@ -158,7 +158,7 @@ t_stats_fun(Config) when is_list(Config) -> ok = emqx_broker:subscribe(<<"topic">>, <<"clientid">>), ok = emqx_broker:subscribe(<<"topic2">>, <<"clientid">>), %% ensure stats refreshed - emqx_broker:stats_fun(), + emqx_broker_helper:stats_fun(), %% emqx_stats:set_stat is a gen_server cast %% make a synced call sync ignored = gen_server:call(emqx_stats, call, infinity), diff --git a/apps/emqx/test/emqx_stats_SUITE.erl b/apps/emqx/test/emqx_stats_SUITE.erl index 1a672fa67..1c32396ce 100644 --- a/apps/emqx/test/emqx_stats_SUITE.erl +++ b/apps/emqx/test/emqx_stats_SUITE.erl @@ -105,10 +105,10 @@ t_helper(_) -> end end, [ - {"emqx_broker", MkTestFun(emqx_broker, stats_fun)}, - {"emqx_sm", MkTestFun(emqx_sm, stats_fun)}, + {"emqx_broker_helper", MkTestFun(emqx_broker_helper, stats_fun)}, {"emqx_router_helper", MkTestFun(emqx_router_helper, stats_fun)}, - {"emqx_cm", MkTestFun(emqx_cm, stats_fun)} + {"emqx_cm", MkTestFun(emqx_cm, stats_fun)}, + {"emqx_retainer", MkTestFun(emqx_retainer, stats_fun)} ]. with_proc(F) -> diff --git a/changes/ce/fix-12765.en.md b/changes/ce/fix-12765.en.md new file mode 100644 index 000000000..01c13146d --- /dev/null +++ b/changes/ce/fix-12765.en.md @@ -0,0 +1,2 @@ +Make sure stats `'subscribers.count'` `'subscribers.max'` countains shared-subscribers. +It only contains non-shared subscribers previously. From 50150423e1159159ab87d8a4d481c83309820d4b Mon Sep 17 00:00:00 2001 From: JimMoen Date: Tue, 2 Apr 2024 17:04:38 +0800 Subject: [PATCH 15/29] docs: rename change log file name due to cherry-pick --- changes/ce/{fix-12765.en.md => fix-12824.en.md} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename changes/ce/{fix-12765.en.md => fix-12824.en.md} (100%) diff --git a/changes/ce/fix-12765.en.md b/changes/ce/fix-12824.en.md similarity index 100% rename from changes/ce/fix-12765.en.md rename to changes/ce/fix-12824.en.md From 5759ba5162a2ba400fa33c94d2919cde47e5f2c9 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Tue, 2 Apr 2024 17:09:22 +0800 Subject: [PATCH 16/29] chore: bump app version --- apps/emqx/src/emqx.app.src | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx/src/emqx.app.src b/apps/emqx/src/emqx.app.src index 1d8c55fe9..462b7e74b 100644 --- a/apps/emqx/src/emqx.app.src +++ b/apps/emqx/src/emqx.app.src @@ -2,7 +2,7 @@ {application, emqx, [ {id, "emqx"}, {description, "EMQX Core"}, - {vsn, "5.2.0"}, + {vsn, "5.2.1"}, {modules, []}, {registered, []}, {applications, [ From 319ec50c0d394f87b97dce0e6a5e1f609da4dc71 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 3 Apr 2024 11:57:01 +0800 Subject: [PATCH 17/29] fix: source bridges missing after restore the backup files --- apps/emqx/src/bhvrs/emqx_config_backup.erl | 15 ++++-- apps/emqx/src/emqx.app.src | 2 +- apps/emqx_bridge/src/emqx_bridge.app.src | 2 +- apps/emqx_bridge/src/emqx_bridge_v2.erl | 21 ++++++++- .../src/emqx_mgmt_data_backup.erl | 47 +++++++++++++------ changes/ce/fix-12826.en.md | 18 +++++++ 6 files changed, 83 insertions(+), 22 deletions(-) create mode 100644 changes/ce/fix-12826.en.md diff --git a/apps/emqx/src/bhvrs/emqx_config_backup.erl b/apps/emqx/src/bhvrs/emqx_config_backup.erl index e4818a871..1ec08c23b 100644 --- a/apps/emqx/src/bhvrs/emqx_config_backup.erl +++ b/apps/emqx/src/bhvrs/emqx_config_backup.erl @@ -16,9 +16,14 @@ -module(emqx_config_backup). +-type ok_result() :: #{ + root_key => emqx_utils_maps:config_key(), + changed => [emqx_utils_maps:config_key_path()] +}. + +-type error_result() :: #{root_key => emqx_utils_maps:config_key(), reason => term()}. + -callback import_config(RawConf :: map()) -> - {ok, #{ - root_key => emqx_utils_maps:config_key(), - changed => [emqx_utils_maps:config_key_path()] - }} - | {error, #{root_key => emqx_utils_maps:config_key(), reason => term()}}. + {ok, ok_result()} + | {error, error_result()} + | {results, {[ok_result()], [error_result()]}}. diff --git a/apps/emqx/src/emqx.app.src b/apps/emqx/src/emqx.app.src index 1d8c55fe9..462b7e74b 100644 --- a/apps/emqx/src/emqx.app.src +++ b/apps/emqx/src/emqx.app.src @@ -2,7 +2,7 @@ {application, emqx, [ {id, "emqx"}, {description, "EMQX Core"}, - {vsn, "5.2.0"}, + {vsn, "5.2.1"}, {modules, []}, {registered, []}, {applications, [ diff --git a/apps/emqx_bridge/src/emqx_bridge.app.src b/apps/emqx_bridge/src/emqx_bridge.app.src index 9ef567f23..57dbc26ba 100644 --- a/apps/emqx_bridge/src/emqx_bridge.app.src +++ b/apps/emqx_bridge/src/emqx_bridge.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge, [ {description, "EMQX bridges"}, - {vsn, "0.1.34"}, + {vsn, "0.1.35"}, {registered, [emqx_bridge_sup]}, {mod, {emqx_bridge_app, []}}, {applications, [ diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index e834dc42e..10d597d36 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -1030,7 +1030,26 @@ bridge_v2_type_to_connector_type(Type) -> import_config(RawConf) -> %% actions structure - emqx_bridge:import_config(RawConf, <<"actions">>, ?ROOT_KEY_ACTIONS, config_key_path()). + ActionRes = emqx_bridge:import_config( + RawConf, <<"actions">>, ?ROOT_KEY_ACTIONS, config_key_path() + ), + SourceRes = emqx_bridge:import_config( + RawConf, <<"sources">>, ?ROOT_KEY_SOURCES, config_key_path_sources() + ), + combine_import_results([ActionRes, SourceRes]). + +combine_import_results(Results0) -> + Results = lists:foldr( + fun + ({ok, OkRes}, {OkAcc, ErrAcc}) -> + {[OkRes | OkAcc], ErrAcc}; + ({error, ErrRes}, {OkAcc, ErrAcc}) -> + {OkAcc, [ErrRes | ErrAcc]} + end, + {[], []}, + Results0 + ), + {results, Results}. %%==================================================================== %% Config Update Handler API diff --git a/apps/emqx_management/src/emqx_mgmt_data_backup.erl b/apps/emqx_management/src/emqx_mgmt_data_backup.erl index 2aaa014a8..03eb7ac06 100644 --- a/apps/emqx_management/src/emqx_mgmt_data_backup.erl +++ b/apps/emqx_management/src/emqx_mgmt_data_backup.erl @@ -773,23 +773,42 @@ validate_cluster_hocon(RawConf) -> do_import_conf(RawConf, Opts) -> GenConfErrs = filter_errors(maps:from_list(import_generic_conf(RawConf))), maybe_print_conf_errors(GenConfErrs, Opts), - Errors = - lists:foldl( - fun(Module, ErrorsAcc) -> - case Module:import_config(RawConf) of - {ok, #{changed := Changed}} -> - maybe_print_changed(Changed, Opts), - ErrorsAcc; - {error, #{root_key := RootKey, reason := Reason}} -> - ErrorsAcc#{[RootKey] => Reason} - end - end, - GenConfErrs, - sort_importer_modules(find_behaviours(emqx_config_backup)) - ), + Modules = sort_importer_modules(find_behaviours(emqx_config_backup)), + Errors = lists:foldl(print_ok_results_collect_errors(RawConf, Opts), GenConfErrs, Modules), maybe_print_conf_errors(Errors, Opts), Errors. +print_ok_results_collect_errors(RawConf, Opts) -> + fun(Module, Errors) -> + case Module:import_config(RawConf) of + {results, {OkResults, ErrResults}} -> + print_ok_results(OkResults, Opts), + collect_errors(ErrResults, Errors); + {ok, OkResult} -> + print_ok_results([OkResult], Opts), + Errors; + {error, ErrResult} -> + collect_errors([ErrResult], Errors) + end + end. + +print_ok_results(Results, Opts) -> + lists:foreach( + fun(#{changed := Changed}) -> + maybe_print_changed(Changed, Opts) + end, + Results + ). + +collect_errors(Results, Errors) -> + lists:foldr( + fun(#{root_key := RootKey, reason := Reason}, Acc) -> + Acc#{[RootKey] => Reason} + end, + Errors, + Results + ). + sort_importer_modules(Modules) -> lists:sort( fun(M1, M2) -> order(M1, ?IMPORT_ORDER) =< order(M2, ?IMPORT_ORDER) end, diff --git a/changes/ce/fix-12826.en.md b/changes/ce/fix-12826.en.md new file mode 100644 index 000000000..51255059d --- /dev/null +++ b/changes/ce/fix-12826.en.md @@ -0,0 +1,18 @@ +Cannot import `sources` from backup files. + +Before the fix, the following configs in backup files cannot be imported: + +``` +sources { + mqtt { + source_c384b174 { + connector = source_connector_c8287217 + enable = true + parameters { + qos = 0 + topic = "t/#" + } + } + } +} +``` From 9d1a69aaa9ca77fce31d913847a70d763d3bc15e Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 3 Apr 2024 18:39:32 +0800 Subject: [PATCH 18/29] fix: cannot import retained messages --- apps/emqx_bridge/src/emqx_bridge_v2.erl | 4 ++-- apps/emqx_retainer/src/emqx_retainer.app.src | 2 +- .../src/emqx_retainer_mnesia.erl | 9 +++++++++ changes/ce/fix-12826.en.md | 20 ++++--------------- 4 files changed, 16 insertions(+), 19 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 10d597d36..e6feac7bd 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -1036,9 +1036,9 @@ import_config(RawConf) -> SourceRes = emqx_bridge:import_config( RawConf, <<"sources">>, ?ROOT_KEY_SOURCES, config_key_path_sources() ), - combine_import_results([ActionRes, SourceRes]). + group_import_results([ActionRes, SourceRes]). -combine_import_results(Results0) -> +group_import_results(Results0) -> Results = lists:foldr( fun ({ok, OkRes}, {OkAcc, ErrAcc}) -> diff --git a/apps/emqx_retainer/src/emqx_retainer.app.src b/apps/emqx_retainer/src/emqx_retainer.app.src index 248cc9310..4a8b3cdc3 100644 --- a/apps/emqx_retainer/src/emqx_retainer.app.src +++ b/apps/emqx_retainer/src/emqx_retainer.app.src @@ -2,7 +2,7 @@ {application, emqx_retainer, [ {description, "EMQX Retainer"}, % strict semver, bump manually! - {vsn, "5.0.21"}, + {vsn, "5.0.22"}, {modules, []}, {registered, [emqx_retainer_sup]}, {applications, [kernel, stdlib, emqx, emqx_ctl]}, diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index bdc1f2c67..7e2a73a09 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -17,6 +17,7 @@ -module(emqx_retainer_mnesia). -behaviour(emqx_retainer). +-behaviour(emqx_db_backup). -include("emqx_retainer.hrl"). -include_lib("emqx/include/logger.hrl"). @@ -54,6 +55,8 @@ -export([populate_index_meta/0]). -export([reindex/3]). +-export([backup_tables/0]). + -record(retained_message, {topic, msg, expiry_time}). -record(retained_index, {key, expiry_time}). -record(retained_index_meta, {key, read_indices, write_indices, reindexing, extra}). @@ -73,6 +76,12 @@ topics() -> [emqx_topic:join(I) || I <- mnesia:dirty_all_keys(?TAB_MESSAGE)]. +%%-------------------------------------------------------------------- +%% Data backup +%%-------------------------------------------------------------------- +backup_tables() -> + [?TAB_MESSAGE]. + %%-------------------------------------------------------------------- %% emqx_retainer callbacks %%-------------------------------------------------------------------- diff --git a/changes/ce/fix-12826.en.md b/changes/ce/fix-12826.en.md index 51255059d..28829cf87 100644 --- a/changes/ce/fix-12826.en.md +++ b/changes/ce/fix-12826.en.md @@ -1,18 +1,6 @@ -Cannot import `sources` from backup files. +Fixed an issue that prevented importing source data integrations and retained messages. -Before the fix, the following configs in backup files cannot be imported: +Before the fix: -``` -sources { - mqtt { - source_c384b174 { - connector = source_connector_c8287217 - enable = true - parameters { - qos = 0 - topic = "t/#" - } - } - } -} -``` +- source data integrations are ignored from the backup file +- importing the `mnesia` table for retained messages are not supported From 1c81c79a2cb5f6713e250a8b1f795845e01fef54 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Sun, 7 Apr 2024 17:24:26 +0800 Subject: [PATCH 19/29] chore: add testcase for importing retained msgs and sources --- .../test/emqx_mgmt_data_backup_SUITE.erl | 23 ++++++++++++++++++ ...emqx-export-4.4.24-retainer-mqttsub.tar.gz | Bin 0 -> 2352 bytes 2 files changed, 23 insertions(+) create mode 100644 apps/emqx_management/test/emqx_mgmt_data_backup_SUITE_data/emqx-export-4.4.24-retainer-mqttsub.tar.gz diff --git a/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl index 36a838743..e1d0a2512 100644 --- a/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl @@ -18,6 +18,7 @@ -compile(export_all). -compile(nowarn_export_all). +-include_lib("emqx_utils/include/emqx_message.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -86,6 +87,28 @@ t_empty_export_import(_Config) -> ?assertEqual(Exp, emqx_mgmt_data_backup:import(FileName)), ?assertEqual(ExpRawConf, emqx:get_raw_config([])). +t_cluster_hocon_import_mqtt_subscribers_retainer_messages(Config) -> + FNameEmqx44 = "emqx-export-4.4.24-retainer-mqttsub.tar.gz", + BackupFile = filename:join(?config(data_dir, Config), FNameEmqx44), + Exp = {ok, #{db_errors => #{}, config_errors => #{}}}, + ?assertEqual(Exp, emqx_mgmt_data_backup:import(BackupFile)), + RawConfAfterImport = emqx:get_raw_config([]), + %% verify that MQTT sources are imported + ?assertMatch( + #{<<"sources">> := #{<<"mqtt">> := Sources}} when map_size(Sources) > 0, + RawConfAfterImport + ), + %% verify that retainer messages are imported + ?assertMatch( + {ok, [#message{payload = <<"test-payload">>}]}, + emqx_retainer:read_message(<<"test-retained-message/1">>) + ), + %% Export and import again + {ok, #{filename := FileName}} = emqx_mgmt_data_backup:export(), + ?assertEqual(Exp, emqx_mgmt_data_backup:import(FileName)), + ?assertEqual(RawConfAfterImport, emqx:get_raw_config([])), + ok. + t_cluster_hocon_export_import(Config) -> RawConfBeforeImport = emqx:get_raw_config([]), BootstrapFile = filename:join(?config(data_dir, Config), ?BOOTSTRAP_BACKUP), diff --git a/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE_data/emqx-export-4.4.24-retainer-mqttsub.tar.gz b/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE_data/emqx-export-4.4.24-retainer-mqttsub.tar.gz new file mode 100644 index 0000000000000000000000000000000000000000..67133f19d7e2958933f5fef09d07ccbfd64883e5 GIT binary patch literal 2352 zcmV-03D5Q)iwFR|V-jWn1MONtY$Hh(ZO4wCbu!Gb(r8bhmS>ikLAKLvw{3Q$MYBNI zIcx;GGl+rN&?d;Ify!a07`O7fv880)(_e;=oFrR^kF2k>J7s zB(Ci8tGc>lcbqt$buw(|A1Utszv}<`|5v^GRW|PL?pN@>=Mul7RkdnOs}SrX%fY15 z-}QYuXs=5jlN3d1H5xMiY&Q9)Qq#guL9(hf6-{fZ&AKKlYO7UOB)Rc%4WEVv)JFt{ zVq1g#;R)@aS*LS?T=)@xenJSJBL9XBLBgcka}C#d3L6lQrluX6|31TRsS1 z$};td<#cb}8r`an^*F9Go#;)=^24bA)+_}Biiv~zSZH9c@?$yTjtvC3N2Lb@{t?{8 z`_N*RDZ{o{noMCTd<)EP-bFhIHN0D^I~R>Ybnco9+l-NSYl9A z%+krWU~h6-E(9`SeFxm1GGB+~l8I=q?IL29M&&qt<%gx~_r7&+{aR^jR6W{L;7duZ zwW+Af)R7y0xT7eEZhLDkkt@xNAQ2EI4Z&> z?O?-q3FY9-c5#8yA$IF=1D(nAcswKq^ER=}F4p_5$t@t8MvxvY8{H$gW9@@9)lkS;ChJCz$GT%5h9-J+B&r!v6gDYBd+i2doZ4tYO!9(V%T^r9g&FIumYm3i90e9q6iHRN5do|2w+qI@o=<}v4b5`gy!fj*GK!h z=wRx7OeyN}Jx>WE4p9b%E^_H+S2x?y7k$)?>%CHM%-RL4#ihRt0Ix9ZTza8`tKg||CynncAz zR15G)8$W@Zbm;_x%>nKP3;Oi~@iHn+G}(1&D29iGC=7Hc4WAPYIHrVh{+JN6oWsm@ z0d0H>3o{TU&ZE^E-vD{#>!EyjW1{nD?71^=3{fKyEP`B2Hrx`QCT3ytMElYkQ3pAm zEFI$Z<;~ac-;awc-<56GfMkXk8I5CFjYW6uWQBEa8^a*b()M(;9;bjA9nPoZyCsU~OQwJ;?{*1W?OyHkq1A zn9qr!KI`K70?=vBf14ZkUwP`opEUFQrzrJS;{2yJlzMvpI}gI9C6Ec@BJ846z*M8! zWV`6I9$rdkOs84@eFswuJ^eM{-1mPqwWet4{oi@e9QkKWb>w+-+n9m>s;V@T_kXRr zmg4_;P-ZEcGahWN!sFfcH(z_@d%b*6wB4@W!^p%WD6;dV z9M^&4;T^~Axzz7v+me+3Sz9W+daW(3?S2-FMGf)JG-MKEU~R7OHm+k2$2u1I!3hqYm>`KM zLIxDLlpk-w2so+-#PTtSR{Ch$h38?g%nyseTDBY$?=w9)>hwMKaETY^tj_h}=zG|7 zm0$zR=ttZtBHxD!c;JU4F9^>I$g}hvJPaWakan-LYWoy{xk$y}6zH#jpSQyBD05%zGbx z#HfIhQHD+hQ71sJCA5U3?d@kfhf{ibniu~gVRY4T8XGh4zYd_E!2gEUYNYso9&|SN z|NYN_=D#NK|LW8kG`Sf}Zxinfs~p7v{Vu`|aV2?gcyy=82Fv{3Bgpn?H&_*SeZJMl z3kUyS{ro`${$KphoZ$a{nCjdKDPHQszZ|>oW)Fonad$w`cCQ!|L@UO>BIU0>{+D(4 zC;z@9NyRsp|AHj3krv8VcWhp750@rXiE*T;rGQ!XRUOiYdGk z-xWJ~>HQzP{mXYUOoN1&n#^o}a#Q`8qMor=+h(+;~Kok}nh-mg46NguSB;xR=1;pXnNpTS4VC`NI`yR5r9;((V zTBlxnNo#6T!l9mWo?nIK>qi@@Ss8OQqfT}G4|ji9V6LydEoD%Kv9qeG3dhZXClKVQ z=lNiTe_dj@c93HmHoRj@_+rhWCkVz3t9T{)vV&<}R-3if7lP;FuUSwo8NQ%+h!os= z9;c~Jt@FIDahbT!hrq3U@MJu2DgCd~y!BrjIgtGMY|J|UE1Ht1|LU5er1jr<(Am^~ zdl}ID+hqN>IyQqQtG;o`#kvK^$4(_GyX7Do^xR;nJ!GJcsOtnvaJ*k0IKE{sytw-y z|Cy+``%ZeYPbsE(@!z96bK97W|H<$FYOO|k{y!Hw8~nF_3YvdE{r(qb(6swsU|-Br z{&V1&4q`H#cD&CssbvP2Xb*)RvrFJ$8Snc9EqojJ_l`G5hrp7dRqUV3!M%AKY0f<|2>KSpFT2!CO3kk@(a2na7eeDj(c=R$OVN^ z3R^<-d)e?S!5l@lAFNqarvm_Z)@tGV!^%It5*-tMJntQFN}oiU8~+V^ZW}Y}|620< zzgkPJrTBjybT;_^2;MU99`pMT!=C#455vBgXg}^=0_>N Date: Sun, 7 Apr 2024 18:23:59 +0800 Subject: [PATCH 20/29] ci: run emqx_management both with ee and ce profile --- .../test/emqx_mgmt_data_backup_SUITE.erl | 43 +++++++++++-------- scripts/find-apps.sh | 4 ++ 2 files changed, 28 insertions(+), 19 deletions(-) diff --git a/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl index e1d0a2512..fee392479 100644 --- a/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl @@ -88,25 +88,30 @@ t_empty_export_import(_Config) -> ?assertEqual(ExpRawConf, emqx:get_raw_config([])). t_cluster_hocon_import_mqtt_subscribers_retainer_messages(Config) -> - FNameEmqx44 = "emqx-export-4.4.24-retainer-mqttsub.tar.gz", - BackupFile = filename:join(?config(data_dir, Config), FNameEmqx44), - Exp = {ok, #{db_errors => #{}, config_errors => #{}}}, - ?assertEqual(Exp, emqx_mgmt_data_backup:import(BackupFile)), - RawConfAfterImport = emqx:get_raw_config([]), - %% verify that MQTT sources are imported - ?assertMatch( - #{<<"sources">> := #{<<"mqtt">> := Sources}} when map_size(Sources) > 0, - RawConfAfterImport - ), - %% verify that retainer messages are imported - ?assertMatch( - {ok, [#message{payload = <<"test-payload">>}]}, - emqx_retainer:read_message(<<"test-retained-message/1">>) - ), - %% Export and import again - {ok, #{filename := FileName}} = emqx_mgmt_data_backup:export(), - ?assertEqual(Exp, emqx_mgmt_data_backup:import(FileName)), - ?assertEqual(RawConfAfterImport, emqx:get_raw_config([])), + case emqx_release:edition() of + ce -> + ok; + ee -> + FNameEmqx44 = "emqx-export-4.4.24-retainer-mqttsub.tar.gz", + BackupFile = filename:join(?config(data_dir, Config), FNameEmqx44), + Exp = {ok, #{db_errors => #{}, config_errors => #{}}}, + ?assertEqual(Exp, emqx_mgmt_data_backup:import(BackupFile)), + RawConfAfterImport = emqx:get_raw_config([]), + %% verify that MQTT sources are imported + ?assertMatch( + #{<<"sources">> := #{<<"mqtt">> := Sources}} when map_size(Sources) > 0, + RawConfAfterImport + ), + %% verify that retainer messages are imported + ?assertMatch( + {ok, [#message{payload = <<"test-payload">>}]}, + emqx_retainer:read_message(<<"test-retained-message/1">>) + ), + %% Export and import again + {ok, #{filename := FileName}} = emqx_mgmt_data_backup:export(), + ?assertEqual(Exp, emqx_mgmt_data_backup:import(FileName)), + ?assertEqual(RawConfAfterImport, emqx:get_raw_config([])) + end, ok. t_cluster_hocon_export_import(Config) -> diff --git a/scripts/find-apps.sh b/scripts/find-apps.sh index 89f0a66e5..908f22d9c 100755 --- a/scripts/find-apps.sh +++ b/scripts/find-apps.sh @@ -101,6 +101,10 @@ matrix() { entries+=("$(format_app_entry "$app" 1 emqx "$runner")") entries+=("$(format_app_entry "$app" 1 emqx-enterprise "$runner")") ;; + apps/emqx_management) + entries+=("$(format_app_entry "$app" 1 emqx "$runner")") + entries+=("$(format_app_entry "$app" 1 emqx-enterprise "$runner")") + ;; apps/*) if [[ -f "${app}/BSL.txt" ]]; then profile='emqx-enterprise' From 698b8e6a05c600d0de4469bd99f9a69936ad73dc Mon Sep 17 00:00:00 2001 From: Kinplemelon Date: Tue, 9 Apr 2024 09:37:49 +0800 Subject: [PATCH 21/29] chore(dashboard): bump dashboard version to v1.8.1 & e1.6.1 --- Makefile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 1753dd4d8..2f01ad16e 100644 --- a/Makefile +++ b/Makefile @@ -20,8 +20,8 @@ endif # Dashboard version # from https://github.com/emqx/emqx-dashboard5 -export EMQX_DASHBOARD_VERSION ?= v1.8.0 -export EMQX_EE_DASHBOARD_VERSION ?= e1.6.0 +export EMQX_DASHBOARD_VERSION ?= v1.8.1 +export EMQX_EE_DASHBOARD_VERSION ?= e1.6.1 PROFILE ?= emqx REL_PROFILES := emqx emqx-enterprise From a1495689c0d890903353ff052f306ee76f133728 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Mon, 8 Apr 2024 10:21:00 +0800 Subject: [PATCH 22/29] fix: clean self node's cluster commit when leave cluster --- apps/emqx_conf/src/emqx_cluster_rpc.erl | 3 ++- changes/ce/fix-12843.en.md | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) create mode 100644 changes/ce/fix-12843.en.md diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index 39d471ed9..37f052f56 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -224,6 +224,7 @@ reset() -> gen_server:call(?MODULE, reset). status() -> transaction(fun ?MODULE:trans_status/0, []). +%% DO NOT delete this on_leave_clean/0, It's use when rpc before v560. on_leave_clean() -> on_leave_clean(node()). @@ -367,7 +368,7 @@ handle_call({fast_forward_to_commit, ToTnxId}, _From, State) -> NodeId = do_fast_forward_to_commit(ToTnxId, State), {reply, NodeId, State, catch_up(State)}; handle_call(on_leave, _From, State) -> - {atomic, ok} = transaction(fun ?MODULE:on_leave_clean/0, []), + {atomic, ok} = transaction(fun ?MODULE:on_leave_clean/1, [node()]), {reply, ok, State#{is_leaving := true}}; handle_call(_, _From, State) -> {reply, ok, State, catch_up(State)}. diff --git a/changes/ce/fix-12843.en.md b/changes/ce/fix-12843.en.md new file mode 100644 index 000000000..f0ba2af8c --- /dev/null +++ b/changes/ce/fix-12843.en.md @@ -0,0 +1,2 @@ +Fixed cluster_rpc_commit tnx_id was not properly cleanup after 'cluster leave' on replicator nodes, +The tnx_id of the core node will be deleted before, resulting in the failure of the core node update configuration. From 5579086220b77b2914c0d7f9844f83df022aa330 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Tue, 9 Apr 2024 14:15:18 +0800 Subject: [PATCH 23/29] chore: replicantor -> replicant --- changes/ce/fix-12843.en.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changes/ce/fix-12843.en.md b/changes/ce/fix-12843.en.md index f0ba2af8c..38a46273f 100644 --- a/changes/ce/fix-12843.en.md +++ b/changes/ce/fix-12843.en.md @@ -1,2 +1,2 @@ -Fixed cluster_rpc_commit tnx_id was not properly cleanup after 'cluster leave' on replicator nodes, +Fixed cluster_rpc_commit tnx_id was not properly cleanup after 'cluster leave' on replicant nodes, The tnx_id of the core node will be deleted before, resulting in the failure of the core node update configuration. From 838113291931ae9cc06ea732b70e531340909269 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Tue, 9 Apr 2024 14:44:11 +0800 Subject: [PATCH 24/29] chore: bump emqx_conf to 0.1.36 --- apps/emqx_conf/src/emqx_conf.app.src | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_conf/src/emqx_conf.app.src b/apps/emqx_conf/src/emqx_conf.app.src index dedb0c3c6..0646bc255 100644 --- a/apps/emqx_conf/src/emqx_conf.app.src +++ b/apps/emqx_conf/src/emqx_conf.app.src @@ -1,6 +1,6 @@ {application, emqx_conf, [ {description, "EMQX configuration management"}, - {vsn, "0.1.35"}, + {vsn, "0.1.36"}, {registered, []}, {mod, {emqx_conf_app, []}}, {applications, [kernel, stdlib]}, From d393e963798a443887ed864d45d1e1efef00c820 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Tue, 9 Apr 2024 16:56:41 +0800 Subject: [PATCH 25/29] chore: Apply suggestions from code review Co-authored-by: ieQu1 <99872536+ieQu1@users.noreply.github.com> --- changes/ce/fix-12843.en.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/changes/ce/fix-12843.en.md b/changes/ce/fix-12843.en.md index 38a46273f..000026c00 100644 --- a/changes/ce/fix-12843.en.md +++ b/changes/ce/fix-12843.en.md @@ -1,2 +1,2 @@ -Fixed cluster_rpc_commit tnx_id was not properly cleanup after 'cluster leave' on replicant nodes, -The tnx_id of the core node will be deleted before, resulting in the failure of the core node update configuration. +Fixed `cluster_rpc_commit` transaction ID cleanup procedure after `cluster leave` on replicant nodes. +Previously, the transaction id of the core node would be deleted prematurely, blocking configuration updates on the core node. From 55179ccfeda75bbf31f9d38fe1a6c47f4f74f5fc Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Tue, 9 Apr 2024 19:04:38 +0300 Subject: [PATCH 26/29] chore: update ekka to 0.19.3 Included updates: - https://github.com/emqx/mria/pull/178 --- apps/emqx/rebar.config | 2 +- mix.exs | 2 +- rebar.config | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 70cf636e7..99b8a21a4 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -28,7 +28,7 @@ {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.1"}}}, - {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.2"}}}, + {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.3"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}}, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.42.1"}}}, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}}, diff --git a/mix.exs b/mix.exs index 2ebbc7e66..87fd4d62e 100644 --- a/mix.exs +++ b/mix.exs @@ -55,7 +55,7 @@ defmodule EMQXUmbrella.MixProject do {:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true}, {:esockd, github: "emqx/esockd", tag: "5.11.1", override: true}, {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-2", override: true}, - {:ekka, github: "emqx/ekka", tag: "0.19.2", override: true}, + {:ekka, github: "emqx/ekka", tag: "0.19.3", override: true}, {:gen_rpc, github: "emqx/gen_rpc", tag: "3.3.1", override: true}, {:grpc, github: "emqx/grpc-erl", tag: "0.6.12", override: true}, {:minirest, github: "emqx/minirest", tag: "1.4.0", override: true}, diff --git a/rebar.config b/rebar.config index 537707f4a..b5f5f1d9a 100644 --- a/rebar.config +++ b/rebar.config @@ -83,7 +83,7 @@ {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.1"}}}, {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-2"}}}, - {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.2"}}}, + {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.3"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}}, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.12"}}}, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.4.0"}}}, From fae9005f87d9b4925d8a3a19328e2fb368793f29 Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Tue, 9 Apr 2024 19:06:13 +0300 Subject: [PATCH 27/29] test(emqx_mgmt_cli): test that replicants do not join a left core node --- .../test/emqx_mgmt_cli_SUITE.erl | 38 +++++++++---------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/apps/emqx_management/test/emqx_mgmt_cli_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_cli_SUITE.erl index c6f00bff0..b1d646b40 100644 --- a/apps/emqx_management/test/emqx_mgmt_cli_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_cli_SUITE.erl @@ -33,12 +33,12 @@ end_per_suite(_) -> emqx_mgmt_api_test_util:end_suite([emqx_management, emqx_conf]). init_per_testcase(t_autocluster_leave = TC, Config) -> - [Core1, Core2, Core3, Repl] = + [Core1, Core2, Repl1, Repl2] = Nodes = [ t_autocluster_leave_core1, t_autocluster_leave_core2, - t_autocluster_leave_core3, - t_autocluster_leave_replicant + t_autocluster_leave_replicant1, + t_autocluster_leave_replicant2 ], NodeNames = [emqx_cth_cluster:node_name(N) || N <- Nodes], @@ -58,8 +58,8 @@ init_per_testcase(t_autocluster_leave = TC, Config) -> [ {Core1, #{role => core, apps => AppSpec}}, {Core2, #{role => core, apps => AppSpec}}, - {Core3, #{role => core, apps => AppSpec}}, - {Repl, #{role => replicant, apps => AppSpec}} + {Repl1, #{role => replicant, apps => AppSpec}}, + {Repl2, #{role => replicant, apps => AppSpec}} ], #{work_dir => emqx_cth_suite:work_dir(TC, Config)} ), @@ -307,7 +307,7 @@ t_admin(_Config) -> ok. t_autocluster_leave(Config) -> - [Core1, Core2, Core3, Repl] = Cluster = ?config(cluster, Config), + [Core1, Core2, Repl1, Repl2] = Cluster = ?config(cluster, Config), %% Mria membership updates are async, makes sense to wait a little timer:sleep(300), ClusterView = [lists:sort(rpc:call(N, emqx, running_nodes, [])) || N <- Cluster], @@ -317,24 +317,24 @@ t_autocluster_leave(Config) -> ?assertEqual(View1, View3), ?assertEqual(View1, View4), - rpc:call(Core3, emqx_mgmt_cli, cluster, [["leave"]]), + rpc:call(Core2, emqx_mgmt_cli, cluster, [["leave"]]), timer:sleep(1000), - %% Replicant node may still discover and join Core3 which is now split from [Core1, Core2], - %% but it's expected to choose a bigger cluster of [Core1, Core2].. - ?assertMatch([Core3], rpc:call(Core3, emqx, running_nodes, [])), + %% Replicant nodes can discover Core2 which is now split from [Core1, Core2], + %% but they are expected to ignore Core2, + %% since mria_lb must filter out core nodes that disabled discovery. + ?assertMatch([Core2], rpc:call(Core2, emqx, running_nodes, [])), ?assertEqual(undefined, rpc:call(Core1, erlang, whereis, [ekka_autocluster])), - ?assertEqual(lists:sort([Core1, Core2, Repl]), rpc:call(Core1, emqx, running_nodes, [])), - ?assertEqual(lists:sort([Core1, Core2, Repl]), rpc:call(Core2, emqx, running_nodes, [])), - ?assertEqual(lists:sort([Core1, Core2, Repl]), rpc:call(Repl, emqx, running_nodes, [])), + ?assertEqual(lists:sort([Core1, Repl1, Repl2]), rpc:call(Core1, emqx, running_nodes, [])), + ?assertEqual(lists:sort([Core1, Repl1, Repl2]), rpc:call(Repl1, emqx, running_nodes, [])), + ?assertEqual(lists:sort([Core1, Repl1, Repl2]), rpc:call(Repl2, emqx, running_nodes, [])), - rpc:call(Repl, emqx_mgmt_cli, cluster, [["leave"]]), + rpc:call(Repl1, emqx_mgmt_cli, cluster, [["leave"]]), timer:sleep(1000), - ?assertEqual(lists:sort([Core1, Core2]), rpc:call(Core1, emqx, running_nodes, [])), - ?assertEqual(lists:sort([Core1, Core2]), rpc:call(Core2, emqx, running_nodes, [])), + ?assertEqual(lists:sort([Core1, Repl2]), rpc:call(Core1, emqx, running_nodes, [])), - rpc:call(Core3, emqx_mgmt_cli, cluster, [["discovery", "enable"]]), - rpc:call(Repl, emqx_mgmt_cli, cluster, [["discovery", "enable"]]), - %% core nodes will join and restart asyncly, may need more time to re-cluster + rpc:call(Core2, emqx_mgmt_cli, cluster, [["discovery", "enable"]]), + rpc:call(Repl1, emqx_mgmt_cli, cluster, [["discovery", "enable"]]), + %% nodes will join and restart asyncly, may need more time to re-cluster ?assertEqual( ok, emqx_common_test_helpers:wait_for( From 3b7cade6715514508f0f05dd8267a5dec30906ee Mon Sep 17 00:00:00 2001 From: Ivan Dyachkov Date: Thu, 11 Apr 2024 13:49:40 +0200 Subject: [PATCH 28/29] chore: 5.6.1-beta.1 --- apps/emqx/include/emqx_release.hrl | 4 ++-- deploy/charts/emqx-enterprise/Chart.yaml | 4 ++-- deploy/charts/emqx/Chart.yaml | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/apps/emqx/include/emqx_release.hrl b/apps/emqx/include/emqx_release.hrl index 9679510d9..2637a0270 100644 --- a/apps/emqx/include/emqx_release.hrl +++ b/apps/emqx/include/emqx_release.hrl @@ -32,7 +32,7 @@ %% `apps/emqx/src/bpapi/README.md' %% Opensource edition --define(EMQX_RELEASE_CE, "5.6.0"). +-define(EMQX_RELEASE_CE, "5.6.1-beta.1"). %% Enterprise edition --define(EMQX_RELEASE_EE, "5.6.0"). +-define(EMQX_RELEASE_EE, "5.6.1-beta.1"). diff --git a/deploy/charts/emqx-enterprise/Chart.yaml b/deploy/charts/emqx-enterprise/Chart.yaml index 0fd47100b..573277cac 100644 --- a/deploy/charts/emqx-enterprise/Chart.yaml +++ b/deploy/charts/emqx-enterprise/Chart.yaml @@ -14,8 +14,8 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. -version: 5.6.0 +version: 5.6.1-beta.1 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. -appVersion: 5.6.0 +appVersion: 5.6.1-beta.1 diff --git a/deploy/charts/emqx/Chart.yaml b/deploy/charts/emqx/Chart.yaml index b1ae7ff66..e771499b6 100644 --- a/deploy/charts/emqx/Chart.yaml +++ b/deploy/charts/emqx/Chart.yaml @@ -14,8 +14,8 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. -version: 5.6.0 +version: 5.6.1-beta.1 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. -appVersion: 5.6.0 +appVersion: 5.6.1-beta.1 From b27fc0da2645150141b660b4b9878ac63b23cdc3 Mon Sep 17 00:00:00 2001 From: Ivan Dyachkov Date: Thu, 11 Apr 2024 15:24:41 +0200 Subject: [PATCH 29/29] test(emqx_machine): ensure node is down before testing open ports --- apps/emqx_machine/test/emqx_machine_SUITE.erl | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/apps/emqx_machine/test/emqx_machine_SUITE.erl b/apps/emqx_machine/test/emqx_machine_SUITE.erl index d8bd01c00..d9301aba4 100644 --- a/apps/emqx_machine/test/emqx_machine_SUITE.erl +++ b/apps/emqx_machine/test/emqx_machine_SUITE.erl @@ -144,7 +144,13 @@ t_open_ports_check(Config) -> ?assertEqual(ok, erpc:call(Core2, emqx_machine, open_ports_check, [])), ?assertEqual(ok, erpc:call(Replicant, emqx_machine, open_ports_check, [])), + true = erlang:monitor_node(Core2, true), ok = emqx_cth_cluster:stop_node(Core2), + receive + {nodedown, Core2} -> ok + after 10000 -> + ct:fail("nodedown message not received after 10 seconds.") + end, ?assertEqual(ok, erpc:call(Replicant, emqx_machine, open_ports_check, [])), Results = erpc:call(Core1, emqx_machine, open_ports_check, []),