feat: don't reset tnx_id when cluster_fix

This commit is contained in:
zhongwencool 2024-07-05 09:00:32 +08:00
parent 298211d101
commit f490a0cba2
2 changed files with 80 additions and 33 deletions

View File

@ -34,7 +34,8 @@
on_mria_stop/1,
force_leave_clean/1,
wait_for_cluster_rpc/0,
maybe_init_tnx_id/2
maybe_init_tnx_id/2,
update_mfa/3
]).
-export([
commit/2,
@ -42,6 +43,7 @@
get_cluster_tnx_id/0,
get_node_tnx_id/1,
init_mfa/2,
update_mfa_in_trans/3,
latest_tnx_id/0,
make_initiate_call_req/3,
read_next_mfa/1,
@ -546,6 +548,36 @@ init_mfa(Node, MFA) ->
{retry, Meta}
end.
update_mfa_in_trans(Node, MFA, NodeTnxId) ->
mnesia:write_lock_table(?CLUSTER_MFA),
case get_node_tnx_id(Node) of
NodeTnxId ->
TnxId = NodeTnxId + 1,
MFARec = #cluster_rpc_mfa{
tnx_id = TnxId,
mfa = MFA,
initiator = Node,
created_at = erlang:localtime()
},
ok = mnesia:write(?CLUSTER_MFA, MFARec, write),
lists:foreach(
fun(N) ->
ok = emqx_cluster_rpc:commit(N, NodeTnxId)
end,
mria:running_nodes()
);
NewTnxId ->
Fmt = "someone_has_already_updated,tnx_id(~w) is not the latest(~w)",
Reason = emqx_utils:format(Fmt, [NodeTnxId, NewTnxId]),
mnesia:abort({error, Reason})
end.
update_mfa(Node, MFA, LatestId) ->
case transaction(fun ?MODULE:update_mfa_in_trans/3, [Node, MFA, LatestId]) of
{atomic, ok} -> ok;
{aborted, Error} -> Error
end.
transaction(Func, Args) ->
mria:transaction(?CLUSTER_RPC_SHARD, Func, Args).
@ -574,13 +606,21 @@ trans_status() ->
[],
?CLUSTER_COMMIT
),
Nodes = mria:running_nodes(),
IndexNodes = lists:zip(Nodes, lists:seq(1, length(Nodes))),
lists:sort(
fun(#{node := NA, tnx_id := IdA}, #{node := NB, tnx_id := IdB}) ->
{IdA, NA} > {IdB, NB}
{IdA, index_nodes(NA, IndexNodes)} > {IdB, index_nodes(NB, IndexNodes)}
end,
List
).
index_nodes(Node, IndexNodes) ->
case lists:keyfind(Node, 1, IndexNodes) of
false -> 0;
{_, Index} -> Index
end.
trans_query(TnxId) ->
case mnesia:read(?CLUSTER_MFA, TnxId) of
[] ->

View File

@ -25,7 +25,8 @@
admins/1,
conf/1,
audit/3,
unload/0
unload/0,
mark_fix_log/1
]).
-export([keys/0, get_config/0, get_config/1, load_config/2]).
@ -104,15 +105,8 @@ admins(["fix"]) ->
StoppedNodes =/= [] andalso
emqx_ctl:warning("Found stopped nodes: ~p~n", [StoppedNodes]);
Role ->
Core =
case find_highest_node(Status) of
{same_tnx_id, _TnxId} ->
{ok, Node} = mria_status:upstream_node(?CLUSTER_RPC_SHARD),
Node;
{ok, Node} ->
Node
end,
emqx_ctl:print("Run fix command on ~p(core) node, but current is ~p~n", [Core, Role])
Leader = emqx_cluster_rpc:find_leader(),
emqx_ctl:print("Run fix command on ~p(core) node, but current is ~p~n", [Leader, Role])
end;
admins(["fast_forward"]) ->
status(),
@ -135,7 +129,7 @@ admins(["fast_forward", Node0, ToTnxId]) ->
admins(_) ->
emqx_ctl:usage(usage_sync()).
fix_lagging_with_raw(Node, Keys) ->
fix_lagging_with_raw(ToTnxId, Node, Keys) ->
Confs = lists:foldl(
fun(Key, Acc) ->
KeyRaw = atom_to_binary(Key),
@ -144,12 +138,25 @@ fix_lagging_with_raw(Node, Keys) ->
#{},
Keys
),
ok = emqx_cluster_rpc:reset(),
case load_config_from_raw(Confs, #{mode => replace}) of
ok -> waiting_for_fix_finish();
Error -> Error
case mark_fix_begin(Node, ToTnxId) of
ok ->
case load_config_from_raw(Confs, #{mode => replace}) of
ok -> waiting_for_fix_finish();
Error0 -> Error0
end;
{error, Reason} ->
emqx_ctl:warning("mark fix begin failed: ~s~n", [Reason])
end.
mark_fix_begin(Node, TnxId) ->
{atomic, Status} = emqx_cluster_rpc:status(),
MFA = {?MODULE, mark_fix_log, [Status]},
emqx_cluster_rpc:update_mfa(Node, MFA, TnxId).
mark_fix_log(Status) ->
?SLOG(warning, #{msg => cluster_fix_log, status => Status}),
ok.
audit(Level, From, Log) ->
?AUDIT(Level, redact(Log#{from => From})).
@ -213,25 +220,25 @@ maybe_fix_lagging(Status, #{fix := Fix}) ->
%% because the raw conf is hard to be compared. (e.g, 1000ms vs 1s)
AllConfs = find_running_confs(),
case find_lagging(Status, AllConfs) of
{inconsistent_tnx_id_key, Target, InconsistentKeys} when Fix ->
_ = fix_lagging_with_raw(Target, InconsistentKeys),
{inconsistent_tnx_id_key, ToTnxId, Target, InconsistentKeys} when Fix ->
_ = fix_lagging_with_raw(ToTnxId, Target, InconsistentKeys),
ok;
{inconsistent_tnx_id_key, Target, InconsistentKeys} ->
{inconsistent_tnx_id_key, _ToTnxId, Target, InconsistentKeys} ->
emqx_ctl:warning("Inconsistent keys: ~p~n", [InconsistentKeys]),
print_inconsistent_conf(InconsistentKeys, Target, Status, AllConfs);
inconsistent_tnx_id when Fix ->
{inconsistent_tnx_id, Target, ToTnxId} when Fix ->
print_tnx_id_status(Status),
ok = emqx_cluster_rpc:reset(),
emqx_ctl:print("Reset tnxid to 0 successfully~n");
inconsistent_tnx_id ->
ok = mark_fix_begin(Target, ToTnxId),
emqx_ctl:print("Forward tnxid to ~w successfully~n", [ToTnxId + 1]);
{inconsistent_tnx_id, _Target, _ToTnxId} ->
print_tnx_id_status(Status),
Leader = emqx_cluster_rpc:find_leader(),
emqx_ctl:print(?SUGGESTION(Leader));
{inconsistent_key, TnxId, InconsistentKeys} ->
{inconsistent_key, ToTnxId, InconsistentKeys} ->
[{Target, _} | _] = AllConfs,
print_inconsistent_conf(InconsistentKeys, Target, Status, AllConfs),
emqx_ctl:warning("All configuration synchronized(tnx_id=~w)~n", [
TnxId
ToTnxId
]),
emqx_ctl:warning(
"but inconsistent keys were found: ~p, which come from environment variables or etc/emqx.conf.~n",
@ -693,11 +700,11 @@ find_lagging(Status, AllConfs) ->
InconsistentKeys ->
{inconsistent_key, TnxId, InconsistentKeys}
end;
{ok, Target} ->
{ok, TargetId, Target} ->
{value, {_, TargetConf}, OtherConfs} = lists:keytake(Target, 1, AllConfs),
case find_inconsistent_key(TargetConf, OtherConfs) of
[] -> inconsistent_tnx_id;
ChangedKeys -> {inconsistent_tnx_id_key, Target, ChangedKeys}
[] -> {inconsistent_tnx_id, TargetId, Target};
ChangedKeys -> {inconsistent_tnx_id_key, TargetId, Target, ChangedKeys}
end
end.
@ -722,8 +729,8 @@ find_highest_node(Status) ->
case lists:usort(fun({A, _}, {B, _}) -> A >= B end, Ids) of
[{TnxId, _}] ->
{same_tnx_id, TnxId};
[{_TnxId, Target} | _] ->
{ok, Target}
[{TnxId, Target} | _] ->
{ok, TnxId, Target}
end.
changed(K, V, Conf) ->
@ -870,7 +877,7 @@ find_inconsistent_test() ->
{node2, #{<<"mqtt">> => TargetMqtt1, <<"stats">> => Stats}}
],
?assertEqual(
{inconsistent_tnx_id_key, node2, [<<"mqtt">>]}, find_lagging(Status, Confs0)
{inconsistent_tnx_id_key, 3, node2, [<<"mqtt">>]}, find_lagging(Status, Confs0)
),
%% conf is the same, no changed
@ -879,7 +886,7 @@ find_inconsistent_test() ->
{node2, #{<<"mqtt">> => Mqtt, <<"stats">> => Stats}},
{node3, #{<<"mqtt">> => Mqtt, <<"stats">> => Stats}}
],
?assertEqual(inconsistent_tnx_id, find_lagging(Status, NoDiffConfs)),
?assertEqual({inconsistent_tnx_id, 3, node2}, find_lagging(Status, NoDiffConfs)),
%% same tnx_id
SameStatus = [