diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index 9e930e693..32a91813f 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -28,7 +28,8 @@ reset/0, status/0, skip_failed_commit/1, - fast_forward_to_commit/2 + fast_forward_to_commit/2, + on_leave/0 ]). -export([ commit/2, @@ -40,7 +41,8 @@ make_initiate_call_req/3, read_next_mfa/1, trans_query/1, - trans_status/0 + trans_status/0, + on_leave_clean/0 ]). -export([ @@ -211,6 +213,9 @@ reset() -> gen_server:call(?MODULE, reset). status() -> transaction(fun ?MODULE:trans_status/0, []). +on_leave_clean() -> + mnesia:delete({?CLUSTER_COMMIT, node()}). + -spec latest_tnx_id() -> pos_integer(). latest_tnx_id() -> {atomic, TnxId} = transaction(fun ?MODULE:get_cluster_tnx_id/0, []), @@ -264,6 +269,10 @@ skip_failed_commit(Node) -> -spec fast_forward_to_commit(node(), pos_integer()) -> pos_integer(). fast_forward_to_commit(Node, ToTnxId) -> gen_server:call({?MODULE, Node}, {fast_forward_to_commit, ToTnxId}). + +%% It is necessary to clean this node commit record in the cluster +on_leave() -> + gen_server:call(?MODULE, on_leave). %%%=================================================================== %%% gen_server callbacks %%%=================================================================== @@ -271,7 +280,7 @@ fast_forward_to_commit(Node, ToTnxId) -> %% @private init([Node, RetryMs]) -> {ok, _} = mnesia:subscribe({table, ?CLUSTER_MFA, simple}), - State = #{node => Node, retry_interval => RetryMs}, + State = #{node => Node, retry_interval => RetryMs, is_leaving => false}, %% The init transaction ID is set in emqx_conf_app after %% it has fetched the latest config from one of the core nodes TnxId = emqx_app:get_init_tnx_id(), @@ -306,6 +315,9 @@ handle_call(skip_failed_commit, _From, State = #{node := Node}) -> handle_call({fast_forward_to_commit, ToTnxId}, _From, State) -> NodeId = do_fast_forward_to_commit(ToTnxId, State), {reply, NodeId, State, catch_up(State)}; +handle_call(on_leave, _From, State) -> + {atomic, ok} = transaction(fun ?MODULE:on_leave_clean/0, []), + {reply, ok, State#{is_leaving := true}}; handle_call(_, _From, State) -> {reply, ok, State, catch_up(State)}. @@ -328,7 +340,7 @@ code_change(_OldVsn, State, _Extra) -> %%%=================================================================== catch_up(State) -> catch_up(State, false). -catch_up(#{node := Node, retry_interval := RetryMs} = State, SkipResult) -> +catch_up(#{node := Node, retry_interval := RetryMs, is_leaving := false} = State, SkipResult) -> case transaction(fun ?MODULE:read_next_mfa/1, [Node]) of {atomic, caught_up} -> ?TIMEOUT; @@ -353,7 +365,10 @@ catch_up(#{node := Node, retry_interval := RetryMs} = State, SkipResult) -> {aborted, Reason} -> ?SLOG(error, #{msg => "read_next_mfa_transaction_failed", error => Reason}), RetryMs - end. + end; +catch_up(#{is_leaving := true}, _SkipResult) -> + ?SLOG(info, #{msg => "ignore_mfa_transactions", reason => "Node is in leaving"}), + ?TIMEOUT. read_next_mfa(Node) -> NextId = diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index e0685b2ff..a6357b0b0 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -120,6 +120,7 @@ cluster(["join", SNode]) -> emqx_ctl:print("Failed to join the cluster: ~0p~n", [Error]) end; cluster(["leave"]) -> + emqx_cluster_rpc:on_leave(), case ekka:leave() of ok -> emqx_ctl:print("Leave the cluster successfully.~n"),