diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index eb605f977..283e17ed7 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -34,7 +34,8 @@ on_mria_stop/1, force_leave_clean/1, wait_for_cluster_rpc/0, - maybe_init_tnx_id/2 + maybe_init_tnx_id/2, + update_mfa/3 ]). -export([ commit/2, @@ -42,6 +43,7 @@ get_cluster_tnx_id/0, get_node_tnx_id/1, init_mfa/2, + update_mfa_in_trans/3, latest_tnx_id/0, make_initiate_call_req/3, read_next_mfa/1, @@ -546,6 +548,36 @@ init_mfa(Node, MFA) -> {retry, Meta} end. +update_mfa_in_trans(Node, MFA, NodeTnxId) -> + mnesia:write_lock_table(?CLUSTER_MFA), + case get_node_tnx_id(Node) of + NodeTnxId -> + TnxId = NodeTnxId + 1, + MFARec = #cluster_rpc_mfa{ + tnx_id = TnxId, + mfa = MFA, + initiator = Node, + created_at = erlang:localtime() + }, + ok = mnesia:write(?CLUSTER_MFA, MFARec, write), + lists:foreach( + fun(N) -> + ok = emqx_cluster_rpc:commit(N, NodeTnxId) + end, + mria:running_nodes() + ); + NewTnxId -> + Fmt = "someone_has_already_updated,tnx_id(~w) is not the latest(~w)", + Reason = emqx_utils:format(Fmt, [NodeTnxId, NewTnxId]), + mnesia:abort({error, Reason}) + end. + +update_mfa(Node, MFA, LatestId) -> + case transaction(fun ?MODULE:update_mfa_in_trans/3, [Node, MFA, LatestId]) of + {atomic, ok} -> ok; + {aborted, Error} -> Error + end. + transaction(Func, Args) -> mria:transaction(?CLUSTER_RPC_SHARD, Func, Args). @@ -574,13 +606,21 @@ trans_status() -> [], ?CLUSTER_COMMIT ), + Nodes = mria:running_nodes(), + IndexNodes = lists:zip(Nodes, lists:seq(1, length(Nodes))), lists:sort( fun(#{node := NA, tnx_id := IdA}, #{node := NB, tnx_id := IdB}) -> - {IdA, NA} > {IdB, NB} + {IdA, index_nodes(NA, IndexNodes)} > {IdB, index_nodes(NB, IndexNodes)} end, List ). +index_nodes(Node, IndexNodes) -> + case lists:keyfind(Node, 1, IndexNodes) of + false -> 0; + {_, Index} -> Index + end. + trans_query(TnxId) -> case mnesia:read(?CLUSTER_MFA, TnxId) of [] -> diff --git a/apps/emqx_conf/src/emqx_conf_cli.erl b/apps/emqx_conf/src/emqx_conf_cli.erl index 76ac83f16..d332d31d7 100644 --- a/apps/emqx_conf/src/emqx_conf_cli.erl +++ b/apps/emqx_conf/src/emqx_conf_cli.erl @@ -25,7 +25,8 @@ admins/1, conf/1, audit/3, - unload/0 + unload/0, + mark_fix_log/1 ]). -export([keys/0, get_config/0, get_config/1, load_config/2]). @@ -104,15 +105,8 @@ admins(["fix"]) -> StoppedNodes =/= [] andalso emqx_ctl:warning("Found stopped nodes: ~p~n", [StoppedNodes]); Role -> - Core = - case find_highest_node(Status) of - {same_tnx_id, _TnxId} -> - {ok, Node} = mria_status:upstream_node(?CLUSTER_RPC_SHARD), - Node; - {ok, Node} -> - Node - end, - emqx_ctl:print("Run fix command on ~p(core) node, but current is ~p~n", [Core, Role]) + Leader = emqx_cluster_rpc:find_leader(), + emqx_ctl:print("Run fix command on ~p(core) node, but current is ~p~n", [Leader, Role]) end; admins(["fast_forward"]) -> status(), @@ -135,7 +129,7 @@ admins(["fast_forward", Node0, ToTnxId]) -> admins(_) -> emqx_ctl:usage(usage_sync()). -fix_lagging_with_raw(Node, Keys) -> +fix_lagging_with_raw(ToTnxId, Node, Keys) -> Confs = lists:foldl( fun(Key, Acc) -> KeyRaw = atom_to_binary(Key), @@ -144,12 +138,25 @@ fix_lagging_with_raw(Node, Keys) -> #{}, Keys ), - ok = emqx_cluster_rpc:reset(), - case load_config_from_raw(Confs, #{mode => replace}) of - ok -> waiting_for_fix_finish(); - Error -> Error + case mark_fix_begin(Node, ToTnxId) of + ok -> + case load_config_from_raw(Confs, #{mode => replace}) of + ok -> waiting_for_fix_finish(); + Error0 -> Error0 + end; + {error, Reason} -> + emqx_ctl:warning("mark fix begin failed: ~s~n", [Reason]) end. +mark_fix_begin(Node, TnxId) -> + {atomic, Status} = emqx_cluster_rpc:status(), + MFA = {?MODULE, mark_fix_log, [Status]}, + emqx_cluster_rpc:update_mfa(Node, MFA, TnxId). + +mark_fix_log(Status) -> + ?SLOG(warning, #{msg => cluster_fix_log, status => Status}), + ok. + audit(Level, From, Log) -> ?AUDIT(Level, redact(Log#{from => From})). @@ -213,25 +220,25 @@ maybe_fix_lagging(Status, #{fix := Fix}) -> %% because the raw conf is hard to be compared. (e.g, 1000ms vs 1s) AllConfs = find_running_confs(), case find_lagging(Status, AllConfs) of - {inconsistent_tnx_id_key, Target, InconsistentKeys} when Fix -> - _ = fix_lagging_with_raw(Target, InconsistentKeys), + {inconsistent_tnx_id_key, ToTnxId, Target, InconsistentKeys} when Fix -> + _ = fix_lagging_with_raw(ToTnxId, Target, InconsistentKeys), ok; - {inconsistent_tnx_id_key, Target, InconsistentKeys} -> + {inconsistent_tnx_id_key, _ToTnxId, Target, InconsistentKeys} -> emqx_ctl:warning("Inconsistent keys: ~p~n", [InconsistentKeys]), print_inconsistent_conf(InconsistentKeys, Target, Status, AllConfs); - inconsistent_tnx_id when Fix -> + {inconsistent_tnx_id, Target, ToTnxId} when Fix -> print_tnx_id_status(Status), - ok = emqx_cluster_rpc:reset(), - emqx_ctl:print("Reset tnxid to 0 successfully~n"); - inconsistent_tnx_id -> + ok = mark_fix_begin(Target, ToTnxId), + emqx_ctl:print("Forward tnxid to ~w successfully~n", [ToTnxId + 1]); + {inconsistent_tnx_id, _Target, _ToTnxId} -> print_tnx_id_status(Status), Leader = emqx_cluster_rpc:find_leader(), emqx_ctl:print(?SUGGESTION(Leader)); - {inconsistent_key, TnxId, InconsistentKeys} -> + {inconsistent_key, ToTnxId, InconsistentKeys} -> [{Target, _} | _] = AllConfs, print_inconsistent_conf(InconsistentKeys, Target, Status, AllConfs), emqx_ctl:warning("All configuration synchronized(tnx_id=~w)~n", [ - TnxId + ToTnxId ]), emqx_ctl:warning( "but inconsistent keys were found: ~p, which come from environment variables or etc/emqx.conf.~n", @@ -693,11 +700,11 @@ find_lagging(Status, AllConfs) -> InconsistentKeys -> {inconsistent_key, TnxId, InconsistentKeys} end; - {ok, Target} -> + {ok, TargetId, Target} -> {value, {_, TargetConf}, OtherConfs} = lists:keytake(Target, 1, AllConfs), case find_inconsistent_key(TargetConf, OtherConfs) of - [] -> inconsistent_tnx_id; - ChangedKeys -> {inconsistent_tnx_id_key, Target, ChangedKeys} + [] -> {inconsistent_tnx_id, TargetId, Target}; + ChangedKeys -> {inconsistent_tnx_id_key, TargetId, Target, ChangedKeys} end end. @@ -722,8 +729,8 @@ find_highest_node(Status) -> case lists:usort(fun({A, _}, {B, _}) -> A >= B end, Ids) of [{TnxId, _}] -> {same_tnx_id, TnxId}; - [{_TnxId, Target} | _] -> - {ok, Target} + [{TnxId, Target} | _] -> + {ok, TnxId, Target} end. changed(K, V, Conf) -> @@ -870,7 +877,7 @@ find_inconsistent_test() -> {node2, #{<<"mqtt">> => TargetMqtt1, <<"stats">> => Stats}} ], ?assertEqual( - {inconsistent_tnx_id_key, node2, [<<"mqtt">>]}, find_lagging(Status, Confs0) + {inconsistent_tnx_id_key, 3, node2, [<<"mqtt">>]}, find_lagging(Status, Confs0) ), %% conf is the same, no changed @@ -879,7 +886,7 @@ find_inconsistent_test() -> {node2, #{<<"mqtt">> => Mqtt, <<"stats">> => Stats}}, {node3, #{<<"mqtt">> => Mqtt, <<"stats">> => Stats}} ], - ?assertEqual(inconsistent_tnx_id, find_lagging(Status, NoDiffConfs)), + ?assertEqual({inconsistent_tnx_id, 3, node2}, find_lagging(Status, NoDiffConfs)), %% same tnx_id SameStatus = [