From 51ec8cb8fad376af3e12aff9c98535837dafcee5 Mon Sep 17 00:00:00 2001 From: firest Date: Tue, 27 Jun 2023 12:05:10 +0000 Subject: [PATCH 1/6] fix(emqx_conf): don't sync the MFA to a leaved node --- apps/emqx_conf/src/emqx_cluster_rpc.erl | 25 +++++++++++++++++----- apps/emqx_management/src/emqx_mgmt_cli.erl | 1 + 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index 9e930e693..32a91813f 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -28,7 +28,8 @@ reset/0, status/0, skip_failed_commit/1, - fast_forward_to_commit/2 + fast_forward_to_commit/2, + on_leave/0 ]). -export([ commit/2, @@ -40,7 +41,8 @@ make_initiate_call_req/3, read_next_mfa/1, trans_query/1, - trans_status/0 + trans_status/0, + on_leave_clean/0 ]). -export([ @@ -211,6 +213,9 @@ reset() -> gen_server:call(?MODULE, reset). status() -> transaction(fun ?MODULE:trans_status/0, []). +on_leave_clean() -> + mnesia:delete({?CLUSTER_COMMIT, node()}). + -spec latest_tnx_id() -> pos_integer(). latest_tnx_id() -> {atomic, TnxId} = transaction(fun ?MODULE:get_cluster_tnx_id/0, []), @@ -264,6 +269,10 @@ skip_failed_commit(Node) -> -spec fast_forward_to_commit(node(), pos_integer()) -> pos_integer(). fast_forward_to_commit(Node, ToTnxId) -> gen_server:call({?MODULE, Node}, {fast_forward_to_commit, ToTnxId}). + +%% It is necessary to clean this node commit record in the cluster +on_leave() -> + gen_server:call(?MODULE, on_leave). %%%=================================================================== %%% gen_server callbacks %%%=================================================================== @@ -271,7 +280,7 @@ fast_forward_to_commit(Node, ToTnxId) -> %% @private init([Node, RetryMs]) -> {ok, _} = mnesia:subscribe({table, ?CLUSTER_MFA, simple}), - State = #{node => Node, retry_interval => RetryMs}, + State = #{node => Node, retry_interval => RetryMs, is_leaving => false}, %% The init transaction ID is set in emqx_conf_app after %% it has fetched the latest config from one of the core nodes TnxId = emqx_app:get_init_tnx_id(), @@ -306,6 +315,9 @@ handle_call(skip_failed_commit, _From, State = #{node := Node}) -> handle_call({fast_forward_to_commit, ToTnxId}, _From, State) -> NodeId = do_fast_forward_to_commit(ToTnxId, State), {reply, NodeId, State, catch_up(State)}; +handle_call(on_leave, _From, State) -> + {atomic, ok} = transaction(fun ?MODULE:on_leave_clean/0, []), + {reply, ok, State#{is_leaving := true}}; handle_call(_, _From, State) -> {reply, ok, State, catch_up(State)}. @@ -328,7 +340,7 @@ code_change(_OldVsn, State, _Extra) -> %%%=================================================================== catch_up(State) -> catch_up(State, false). -catch_up(#{node := Node, retry_interval := RetryMs} = State, SkipResult) -> +catch_up(#{node := Node, retry_interval := RetryMs, is_leaving := false} = State, SkipResult) -> case transaction(fun ?MODULE:read_next_mfa/1, [Node]) of {atomic, caught_up} -> ?TIMEOUT; @@ -353,7 +365,10 @@ catch_up(#{node := Node, retry_interval := RetryMs} = State, SkipResult) -> {aborted, Reason} -> ?SLOG(error, #{msg => "read_next_mfa_transaction_failed", error => Reason}), RetryMs - end. + end; +catch_up(#{is_leaving := true}, _SkipResult) -> + ?SLOG(info, #{msg => "ignore_mfa_transactions", reason => "Node is in leaving"}), + ?TIMEOUT. read_next_mfa(Node) -> NextId = diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index e0685b2ff..a6357b0b0 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -120,6 +120,7 @@ cluster(["join", SNode]) -> emqx_ctl:print("Failed to join the cluster: ~0p~n", [Error]) end; cluster(["leave"]) -> + emqx_cluster_rpc:on_leave(), case ekka:leave() of ok -> emqx_ctl:print("Leave the cluster successfully.~n"), From 77bcb739190f9ea437f89731ca0c4f408d1bf82f Mon Sep 17 00:00:00 2001 From: firest Date: Tue, 27 Jun 2023 12:11:59 +0000 Subject: [PATCH 2/6] chore: update changes --- changes/ce/fix-11148.en.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/ce/fix-11148.en.md diff --git a/changes/ce/fix-11148.en.md b/changes/ce/fix-11148.en.md new file mode 100644 index 000000000..931704104 --- /dev/null +++ b/changes/ce/fix-11148.en.md @@ -0,0 +1 @@ +Fix when a node has left the cluster, other nodes still try to synchronize configuration update operations to it. From 8e68af14ef8c072072ee0752481772bf49fa0208 Mon Sep 17 00:00:00 2001 From: firest Date: Wed, 28 Jun 2023 11:15:12 +0000 Subject: [PATCH 3/6] chore: bump ekka version --- apps/emqx/rebar.config | 2 +- mix.exs | 2 +- rebar.config | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 1061d840a..33ed815fd 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -27,7 +27,7 @@ {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}}, - {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.4"}}}, + {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.5"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.10"}}}, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}}, diff --git a/mix.exs b/mix.exs index 9de0ad494..f0aa44918 100644 --- a/mix.exs +++ b/mix.exs @@ -55,7 +55,7 @@ defmodule EMQXUmbrella.MixProject do {:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true}, {:esockd, github: "emqx/esockd", tag: "5.9.6", override: true}, {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.7.2-emqx-11", override: true}, - {:ekka, github: "emqx/ekka", tag: "0.15.4", override: true}, + {:ekka, github: "emqx/ekka", tag: "0.15.5", override: true}, {:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true}, {:grpc, github: "emqx/grpc-erl", tag: "0.6.8", override: true}, {:minirest, github: "emqx/minirest", tag: "1.3.11", override: true}, diff --git a/rebar.config b/rebar.config index c6320c519..9ec3133ce 100644 --- a/rebar.config +++ b/rebar.config @@ -62,7 +62,7 @@ , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}} , {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.7.2-emqx-11"}}} - , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.4"}}} + , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.5"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}} , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.8"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.11"}}} From 71d34504bf983740d91ebf911ae170d89ccf234d Mon Sep 17 00:00:00 2001 From: firest Date: Wed, 28 Jun 2023 10:38:36 +0000 Subject: [PATCH 4/6] fix(cluster_rpc): clean self-data by mria stop callback --- apps/emqx_conf/src/emqx_cluster_rpc.erl | 10 +++++++--- apps/emqx_management/src/emqx_mgmt_cli.erl | 1 - 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index 32a91813f..5d182d095 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -29,7 +29,7 @@ status/0, skip_failed_commit/1, fast_forward_to_commit/2, - on_leave/0 + on_mria_stop/1 ]). -export([ commit/2, @@ -271,14 +271,18 @@ fast_forward_to_commit(Node, ToTnxId) -> gen_server:call({?MODULE, Node}, {fast_forward_to_commit, ToTnxId}). %% It is necessary to clean this node commit record in the cluster -on_leave() -> - gen_server:call(?MODULE, on_leave). +on_mria_stop(leave) -> + gen_server:call(?MODULE, on_leave); +on_mria_stop(_) -> + ok. + %%%=================================================================== %%% gen_server callbacks %%%=================================================================== %% @private init([Node, RetryMs]) -> + mria:register_callback(stop, fun ?MODULE:on_mria_stop/1), {ok, _} = mnesia:subscribe({table, ?CLUSTER_MFA, simple}), State = #{node => Node, retry_interval => RetryMs, is_leaving => false}, %% The init transaction ID is set in emqx_conf_app after diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index a6357b0b0..e0685b2ff 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -120,7 +120,6 @@ cluster(["join", SNode]) -> emqx_ctl:print("Failed to join the cluster: ~0p~n", [Error]) end; cluster(["leave"]) -> - emqx_cluster_rpc:on_leave(), case ekka:leave() of ok -> emqx_ctl:print("Leave the cluster successfully.~n"), From 460ea0e9578c41520ba73062fa223a1f230ff56d Mon Sep 17 00:00:00 2001 From: Ivan Dyachkov Date: Thu, 29 Jun 2023 09:28:50 +0200 Subject: [PATCH 5/6] ci(jmeter): collect emqx logs --- .github/workflows/run_jmeter_tests.yaml | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/.github/workflows/run_jmeter_tests.yaml b/.github/workflows/run_jmeter_tests.yaml index a295c47da..536b70ac3 100644 --- a/.github/workflows/run_jmeter_tests.yaml +++ b/.github/workflows/run_jmeter_tests.yaml @@ -145,6 +145,8 @@ jobs: pgsql_authn_authz: runs-on: ubuntu-22.04 + env: + _EMQX_DOCKER_IMAGE_TAG: emqx/emqx:${{ needs.build_emqx_for_jmeter_tests.outputs.version }} strategy: fail-fast: false @@ -175,7 +177,6 @@ jobs: - name: docker compose up timeout-minutes: 5 env: - _EMQX_DOCKER_IMAGE_TAG: emqx/emqx:${{ needs.build_emqx_for_jmeter_tests.outputs.version }} PGSQL_TAG: ${{ matrix.pgsql_tag }} run: | docker-compose \ @@ -246,6 +247,10 @@ jobs: echo "check logs failed" exit 1 fi + - name: dump docker compose logs + if: failure() + run: | + docker-compose -f .ci/docker-compose-file/docker-compose-emqx-cluster.yaml logs --no-color > ./jmeter_logs/emqx.log - uses: actions/upload-artifact@v3 if: always() with: From afabdc344008bfaea5ba6146fc0563c7ee5dbab4 Mon Sep 17 00:00:00 2001 From: firest Date: Mon, 3 Jul 2023 17:14:32 +0800 Subject: [PATCH 6/6] fix(emqx_conf): call clean callback before ekka stop --- apps/emqx_conf/src/emqx_cluster_rpc.erl | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index 5d182d095..599b8474b 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -282,7 +282,7 @@ on_mria_stop(_) -> %% @private init([Node, RetryMs]) -> - mria:register_callback(stop, fun ?MODULE:on_mria_stop/1), + register_mria_stop_cb(fun ?MODULE:on_mria_stop/1), {ok, _} = mnesia:subscribe({table, ?CLUSTER_MFA, simple}), State = #{node => Node, retry_interval => RetryMs, is_leaving => false}, %% The init transaction ID is set in emqx_conf_app after @@ -643,3 +643,22 @@ do_wait_for_emqx_ready2(N) -> timer:sleep(100), do_wait_for_emqx_ready2(N - 1) end. + +register_mria_stop_cb(Callback) -> + case mria_config:callback(stop) of + undefined -> + mria:register_callback(stop, Callback); + {ok, Previous} -> + mria:register_callback( + stop, + fun(Arg) -> + Callback(Arg), + case erlang:fun_info(Previous, arity) of + {arity, 0} -> + Previous(); + {arity, 1} -> + Previous(Arg) + end + end + ) + end.