fix: clean rpc_commit when force_leave cluster

This commit is contained in:
zhongwencool 2023-12-26 15:22:07 +08:00
parent 95194216cc
commit 11feab983f
5 changed files with 72 additions and 5 deletions

View File

@ -30,6 +30,7 @@
skip_failed_commit/1, skip_failed_commit/1,
fast_forward_to_commit/2, fast_forward_to_commit/2,
on_mria_stop/1, on_mria_stop/1,
force_leave_clean/1,
wait_for_cluster_rpc/0, wait_for_cluster_rpc/0,
maybe_init_tnx_id/2 maybe_init_tnx_id/2
]). ]).
@ -44,6 +45,7 @@
read_next_mfa/1, read_next_mfa/1,
trans_query/1, trans_query/1,
trans_status/0, trans_status/0,
on_leave_clean/1,
on_leave_clean/0, on_leave_clean/0,
get_commit_lag/0, get_commit_lag/0,
get_commit_lag/1 get_commit_lag/1
@ -220,7 +222,10 @@ status() ->
transaction(fun ?MODULE:trans_status/0, []). transaction(fun ?MODULE:trans_status/0, []).
on_leave_clean() -> on_leave_clean() ->
mnesia:delete({?CLUSTER_COMMIT, node()}). on_leave_clean(node()).
on_leave_clean(Node) ->
mnesia:delete({?CLUSTER_COMMIT, Node}).
-spec latest_tnx_id() -> pos_integer(). -spec latest_tnx_id() -> pos_integer().
latest_tnx_id() -> latest_tnx_id() ->
@ -301,6 +306,12 @@ on_mria_stop(leave) ->
on_mria_stop(_) -> on_mria_stop(_) ->
ok. ok.
force_leave_clean(Node) ->
case transaction(fun ?MODULE:on_leave_clean/1, [Node]) of
{atomic, ok} -> ok;
{aborted, Reason} -> {error, Reason}
end.
wait_for_cluster_rpc() -> wait_for_cluster_rpc() ->
%% Workaround for https://github.com/emqx/mria/issues/94: %% Workaround for https://github.com/emqx/mria/issues/94:
Msg1 = #{msg => "wait_for_cluster_rpc_shard"}, Msg1 = #{msg => "wait_for_cluster_rpc_shard"},

View File

@ -1,6 +1,6 @@
{application, emqx_conf, [ {application, emqx_conf, [
{description, "EMQX configuration management"}, {description, "EMQX configuration management"},
{vsn, "0.1.32"}, {vsn, "0.1.33"},
{registered, []}, {registered, []},
{mod, {emqx_conf_app, []}}, {mod, {emqx_conf_app, []}},
{applications, [kernel, stdlib, emqx_ctl]}, {applications, [kernel, stdlib, emqx_ctl]},

View File

@ -115,10 +115,19 @@ cluster(["leave"]) ->
emqx_ctl:print("Failed to leave the cluster: ~0p~n", [Error]) emqx_ctl:print("Failed to leave the cluster: ~0p~n", [Error])
end; end;
cluster(["force-leave", SNode]) -> cluster(["force-leave", SNode]) ->
case mria:force_leave(ekka_node:parse_name(SNode)) of Node = ekka_node:parse_name(SNode),
case mria:force_leave(Node) of
ok -> ok ->
emqx_ctl:print("Remove the node from cluster successfully.~n"), case emqx_cluster_rpc:force_leave_clean(Node) of
cluster(["status"]); ok ->
emqx_ctl:print("Remove the node from cluster successfully.~n"),
cluster(["status"]);
{error, Reason} ->
emqx_ctl:print(
"Failed to remove the node from cluster_rpc.~n~p~n",
[Reason]
)
end;
ignore -> ignore ->
emqx_ctl:print("Ignore.~n"); emqx_ctl:print("Ignore.~n");
{error, Error} -> {error, Error} ->

View File

@ -46,11 +46,57 @@ t_broker(_Config) ->
ok. ok.
t_cluster(_Config) -> t_cluster(_Config) ->
SelfNode = node(),
FakeNode = 'fake@127.0.0.1',
MFA = {io, format, [""]},
meck:new(mria_mnesia, [non_strict, passthrough, no_link]),
meck:expect(mria_mnesia, running_nodes, 0, [SelfNode, FakeNode]),
{atomic, {ok, TnxId, _}} =
mria:transaction(
emqx_cluster_rpc_shard,
fun emqx_cluster_rpc:init_mfa/2,
[SelfNode, MFA]
),
emqx_cluster_rpc:maybe_init_tnx_id(FakeNode, TnxId),
?assertMatch(
{atomic, [
#{
node := SelfNode,
mfa := MFA,
created_at := _,
tnx_id := TnxId,
initiator := SelfNode
},
#{
node := FakeNode,
mfa := MFA,
created_at := _,
tnx_id := TnxId,
initiator := SelfNode
}
]},
emqx_cluster_rpc:status()
),
%% cluster join <Node> # Join the cluster %% cluster join <Node> # Join the cluster
%% cluster leave # Leave the cluster %% cluster leave # Leave the cluster
%% cluster force-leave <Node> # Force the node leave from cluster %% cluster force-leave <Node> # Force the node leave from cluster
%% cluster status # Cluster status %% cluster status # Cluster status
emqx_ctl:run_command(["cluster", "status"]), emqx_ctl:run_command(["cluster", "status"]),
emqx_ctl:run_command(["cluster", "force-leave", atom_to_list(FakeNode)]),
?assertMatch(
{atomic, [
#{
node := SelfNode,
mfa := MFA,
created_at := _,
tnx_id := TnxId,
initiator := SelfNode
}
]},
emqx_cluster_rpc:status()
),
meck:unload(mria_mnesia),
ok. ok.
t_clients(_Config) -> t_clients(_Config) ->

View File

@ -0,0 +1 @@
Fixed issue where commit log info was incorrectly retained after forcefully leaving a cluster, removing unnecessary commit log data improves leave operation cleanup.