fix(emqx_conf): don't sync the MFA to a leaved node

This commit is contained in:
firest 2023-06-27 12:05:10 +00:00
parent f9ea924cd0
commit 51ec8cb8fa
2 changed files with 21 additions and 5 deletions

View File

@ -28,7 +28,8 @@
reset/0, reset/0,
status/0, status/0,
skip_failed_commit/1, skip_failed_commit/1,
fast_forward_to_commit/2 fast_forward_to_commit/2,
on_leave/0
]). ]).
-export([ -export([
commit/2, commit/2,
@ -40,7 +41,8 @@
make_initiate_call_req/3, make_initiate_call_req/3,
read_next_mfa/1, read_next_mfa/1,
trans_query/1, trans_query/1,
trans_status/0 trans_status/0,
on_leave_clean/0
]). ]).
-export([ -export([
@ -211,6 +213,9 @@ reset() -> gen_server:call(?MODULE, reset).
status() -> status() ->
transaction(fun ?MODULE:trans_status/0, []). transaction(fun ?MODULE:trans_status/0, []).
on_leave_clean() ->
mnesia:delete({?CLUSTER_COMMIT, node()}).
-spec latest_tnx_id() -> pos_integer(). -spec latest_tnx_id() -> pos_integer().
latest_tnx_id() -> latest_tnx_id() ->
{atomic, TnxId} = transaction(fun ?MODULE:get_cluster_tnx_id/0, []), {atomic, TnxId} = transaction(fun ?MODULE:get_cluster_tnx_id/0, []),
@ -264,6 +269,10 @@ skip_failed_commit(Node) ->
-spec fast_forward_to_commit(node(), pos_integer()) -> pos_integer(). -spec fast_forward_to_commit(node(), pos_integer()) -> pos_integer().
fast_forward_to_commit(Node, ToTnxId) -> fast_forward_to_commit(Node, ToTnxId) ->
gen_server:call({?MODULE, Node}, {fast_forward_to_commit, ToTnxId}). gen_server:call({?MODULE, Node}, {fast_forward_to_commit, ToTnxId}).
%% It is necessary to clean this node commit record in the cluster
on_leave() ->
gen_server:call(?MODULE, on_leave).
%%%=================================================================== %%%===================================================================
%%% gen_server callbacks %%% gen_server callbacks
%%%=================================================================== %%%===================================================================
@ -271,7 +280,7 @@ fast_forward_to_commit(Node, ToTnxId) ->
%% @private %% @private
init([Node, RetryMs]) -> init([Node, RetryMs]) ->
{ok, _} = mnesia:subscribe({table, ?CLUSTER_MFA, simple}), {ok, _} = mnesia:subscribe({table, ?CLUSTER_MFA, simple}),
State = #{node => Node, retry_interval => RetryMs}, State = #{node => Node, retry_interval => RetryMs, is_leaving => false},
%% The init transaction ID is set in emqx_conf_app after %% The init transaction ID is set in emqx_conf_app after
%% it has fetched the latest config from one of the core nodes %% it has fetched the latest config from one of the core nodes
TnxId = emqx_app:get_init_tnx_id(), TnxId = emqx_app:get_init_tnx_id(),
@ -306,6 +315,9 @@ handle_call(skip_failed_commit, _From, State = #{node := Node}) ->
handle_call({fast_forward_to_commit, ToTnxId}, _From, State) -> handle_call({fast_forward_to_commit, ToTnxId}, _From, State) ->
NodeId = do_fast_forward_to_commit(ToTnxId, State), NodeId = do_fast_forward_to_commit(ToTnxId, State),
{reply, NodeId, State, catch_up(State)}; {reply, NodeId, State, catch_up(State)};
handle_call(on_leave, _From, State) ->
{atomic, ok} = transaction(fun ?MODULE:on_leave_clean/0, []),
{reply, ok, State#{is_leaving := true}};
handle_call(_, _From, State) -> handle_call(_, _From, State) ->
{reply, ok, State, catch_up(State)}. {reply, ok, State, catch_up(State)}.
@ -328,7 +340,7 @@ code_change(_OldVsn, State, _Extra) ->
%%%=================================================================== %%%===================================================================
catch_up(State) -> catch_up(State, false). catch_up(State) -> catch_up(State, false).
catch_up(#{node := Node, retry_interval := RetryMs} = State, SkipResult) -> catch_up(#{node := Node, retry_interval := RetryMs, is_leaving := false} = State, SkipResult) ->
case transaction(fun ?MODULE:read_next_mfa/1, [Node]) of case transaction(fun ?MODULE:read_next_mfa/1, [Node]) of
{atomic, caught_up} -> {atomic, caught_up} ->
?TIMEOUT; ?TIMEOUT;
@ -353,7 +365,10 @@ catch_up(#{node := Node, retry_interval := RetryMs} = State, SkipResult) ->
{aborted, Reason} -> {aborted, Reason} ->
?SLOG(error, #{msg => "read_next_mfa_transaction_failed", error => Reason}), ?SLOG(error, #{msg => "read_next_mfa_transaction_failed", error => Reason}),
RetryMs RetryMs
end. end;
catch_up(#{is_leaving := true}, _SkipResult) ->
?SLOG(info, #{msg => "ignore_mfa_transactions", reason => "Node is in leaving"}),
?TIMEOUT.
read_next_mfa(Node) -> read_next_mfa(Node) ->
NextId = NextId =

View File

@ -120,6 +120,7 @@ cluster(["join", SNode]) ->
emqx_ctl:print("Failed to join the cluster: ~0p~n", [Error]) emqx_ctl:print("Failed to join the cluster: ~0p~n", [Error])
end; end;
cluster(["leave"]) -> cluster(["leave"]) ->
emqx_cluster_rpc:on_leave(),
case ekka:leave() of case ekka:leave() of
ok -> ok ->
emqx_ctl:print("Leave the cluster successfully.~n"), emqx_ctl:print("Leave the cluster successfully.~n"),