From f9e9748cecf5e54025b9077fbd8f6c10fbcdc988 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Tue, 7 Nov 2023 17:06:08 +0100 Subject: [PATCH] fix(cluster-rpc): boot from local config if table loaded When EMQX boots up, it tries to get latest config from peer (core type) nodes, if none of the nodes are replying, the node will decide to boot with local config (and replay the committed changes) if the commit table is loaded from disk locally (an indication of the data being latest), otherwise it will sleep for 1-2 seconds and retry. This lead to a race condition, e.g. in a two nodes cluster: 1. node1 boots up 2. node2 boots up and copy mnesia table from node1 3. node1 restart before node2 can sync cluster.hocon from it 4. node1 boots up and copy mnesia table from node2 Now that both node1 and node2 has the mnesia `load_node` pointing to each other (i.e. not a local disk load). Prior to this fix, the nodes would wait for each other in a dead loop. This commit fixes the issue by allowing node to boot with local config if it does not have a lagging. --- apps/emqx_conf/include/emqx_conf.hrl | 1 + apps/emqx_conf/src/emqx_cluster_rpc.erl | 33 ++++- apps/emqx_conf/src/emqx_conf_app.erl | 189 +++++++++++++----------- changes/ce/fix-11897.en.md | 1 + 4 files changed, 130 insertions(+), 94 deletions(-) create mode 100644 changes/ce/fix-11897.en.md diff --git a/apps/emqx_conf/include/emqx_conf.hrl b/apps/emqx_conf/include/emqx_conf.hrl index 4ae2b1df9..2b4d48173 100644 --- a/apps/emqx_conf/include/emqx_conf.hrl +++ b/apps/emqx_conf/include/emqx_conf.hrl @@ -21,6 +21,7 @@ -define(CLUSTER_MFA, cluster_rpc_mfa). -define(CLUSTER_COMMIT, cluster_rpc_commit). +-define(DEFAULT_INIT_TXN_ID, -1). -record(cluster_rpc_mfa, { tnx_id :: pos_integer(), diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index 934d7ef7a..5bc330afa 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -44,7 +44,9 @@ read_next_mfa/1, trans_query/1, trans_status/0, - on_leave_clean/0 + on_leave_clean/0, + get_commit_lag/0, + get_commit_lag/1 ]). -export([ @@ -231,13 +233,29 @@ make_initiate_call_req(M, F, A) -> -spec get_node_tnx_id(node()) -> integer(). get_node_tnx_id(Node) -> case mnesia:wread({?CLUSTER_COMMIT, Node}) of - [] -> -1; + [] -> ?DEFAULT_INIT_TXN_ID; [#cluster_rpc_commit{tnx_id = TnxId}] -> TnxId end. +%% @doc Return the commit lag of *this* node. +-spec get_commit_lag() -> #{my_id := pos_integer(), latest := pos_integer()}. +get_commit_lag() -> + {atomic, Result} = transaction(fun ?MODULE:get_commit_lag/1, [node()]), + Result. + +get_commit_lag(Node) -> + LatestId = get_cluster_tnx_id(), + LatestNode = + case mnesia:read(?CLUSTER_MFA, LatestId) of + [#?CLUSTER_MFA{initiator = N}] -> N; + _ -> undefined + end, + MyId = get_node_tnx_id(Node), + #{my_id => MyId, latest => LatestId, latest_node => LatestNode}. + %% Checks whether the Mnesia tables used by this module are waiting to %% be loaded and from where. --spec get_tables_status() -> #{atom() => {waiting, [node()]} | {disc | network, node()}}. +-spec get_tables_status() -> #{atom() => {waiting, [node()]} | {loaded, local | node()}}. get_tables_status() -> maps:from_list([ {Tab, do_get_tables_status(Tab)} @@ -249,13 +267,16 @@ do_get_tables_status(Tab) -> TabNodes = proplists:get_value(all_nodes, Props), KnownDown = mnesia_recover:get_mnesia_downs(), LocalNode = node(), - case proplists:get_value(load_node, Props) of + %% load_node. Returns the name of the node that Mnesia loaded the table from. + %% The structure of the returned value is unspecified, but can be useful for debugging purposes. + LoadedFrom = proplists:get_value(load_node, Props), + case LoadedFrom of unknown -> {waiting, TabNodes -- [LocalNode | KnownDown]}; LocalNode -> - {disc, LocalNode}; + {loaded, local}; Node -> - {network, Node} + {loaded, Node} end. %% Regardless of what MFA is returned, consider it a success), diff --git a/apps/emqx_conf/src/emqx_conf_app.erl b/apps/emqx_conf/src/emqx_conf_app.erl index 7addb3823..a2a2dc649 100644 --- a/apps/emqx_conf/src/emqx_conf_app.erl +++ b/apps/emqx_conf/src/emqx_conf_app.erl @@ -26,8 +26,6 @@ -include_lib("emqx/include/logger.hrl"). -include("emqx_conf.hrl"). --define(DEFAULT_INIT_TXN_ID, -1). - start(_StartType, _StartArgs) -> try ok = init_conf() @@ -52,31 +50,32 @@ unset_config_loaded() -> %% This function is named 'override' due to historical reasons. get_override_config_file() -> Node = node(), + Data = #{ + wall_clock => erlang:statistics(wall_clock), + node => Node, + release => emqx_release:version_with_prefix() + }, case emqx_app:init_load_done() of false -> - {error, #{node => Node, msg => "init_conf_load_not_done"}}; + {error, Data#{msg => "init_conf_load_not_done"}}; true -> case erlang:whereis(emqx_config_handler) of undefined -> - {error, #{node => Node, msg => "emqx_config_handler_not_ready"}}; + {error, #{msg => "emqx_config_handler_not_ready"}}; _ -> Fun = fun() -> TnxId = emqx_cluster_rpc:get_node_tnx_id(Node), - WallClock = erlang:statistics(wall_clock), Conf = emqx_config_handler:get_raw_cluster_override_conf(), HasDeprecateFile = emqx_config:has_deprecated_file(), - #{ - wall_clock => WallClock, + Data#{ conf => Conf, tnx_id => TnxId, - node => Node, - has_deprecated_file => HasDeprecateFile, - release => emqx_release:version_with_prefix() + has_deprecated_file => HasDeprecateFile } end, case mria:ro_transaction(?CLUSTER_RPC_SHARD, Fun) of {atomic, Res} -> {ok, Res}; - {aborted, Reason} -> {error, #{node => Node, msg => Reason}} + {aborted, Reason} -> {error, Data#{msg => Reason}} end end end. @@ -105,7 +104,7 @@ init_load(TnxId) -> ok = emqx_app:set_config_loader(emqx_conf), ok; Module -> - ?SLOG(debug, #{ + ?SLOG(info, #{ msg => "skip_init_config_load", reason => "Some application has set another config loader", loader => Module @@ -126,7 +125,7 @@ sync_cluster_conf() -> case cluster_nodes() of [] -> %% The first core nodes is self. - ?SLOG(debug, #{ + ?SLOG(info, #{ msg => "skip_sync_cluster_conf", reason => "This is a single node, or the first node in the cluster" }), @@ -138,70 +137,94 @@ sync_cluster_conf() -> %% @private Some core nodes are running, try to sync the cluster config from them. sync_cluster_conf2(Nodes) -> {Results, Failed} = emqx_conf_proto_v3:get_override_config_file(Nodes), - {Ready, NotReady0} = lists:partition(fun(Res) -> element(1, Res) =:= ok end, Results), - NotReady = lists:filter(fun(Res) -> element(1, Res) =:= error end, NotReady0), - case (Failed =/= [] orelse NotReady =/= []) of - true when Ready =/= [] -> - %% Some core nodes failed to reply. - Warning = #{ - nodes => Nodes, - failed => Failed, - not_ready => NotReady, - msg => "ignored_nodes_when_sync_cluster_conf" - }, - ?SLOG(warning, Warning); - true when Failed =/= [] -> - %% There are core nodes running but no one was able to reply. - ?SLOG(error, #{ - msg => "failed_to_sync_cluster_conf", - nodes => Nodes, - failed => Failed, - not_ready => NotReady - }); - true -> - %% There are core nodes booting up - ?SLOG(info, #{ - msg => "peer_not_ready_for_config_sync", - reason => "The 'not_ready' peer node(s) are loading configs", - nodes => Nodes, - not_ready => NotReady - }); - false -> - ok - end, - case Ready of + {Ready, NotReady} = lists:partition(fun(Res) -> element(1, Res) =:= ok end, Results), + LogData = #{peer_nodes => Nodes, self_node => node()}, + case Failed ++ NotReady of [] -> - case should_proceed_with_boot() of - true -> - %% Act as if this node is alone, so it can - %% finish the boot sequence and load the - %% config for other nodes to copy it. - ?SLOG(info, #{ - msg => "skip_sync_cluster_conf", - loading_from_disk => true, - nodes => Nodes, - failed => Failed, - not_ready => NotReady - }), - {ok, ?DEFAULT_INIT_TXN_ID}; - false -> - %% retry in some time - Jitter = rand:uniform(2000), - Timeout = 10000 + Jitter, - timer:sleep(Timeout), - ?SLOG(warning, #{ - msg => "sync_cluster_conf_retry", - timeout => Timeout, - nodes => Nodes, - failed => Failed, - not_ready => NotReady - }), - sync_cluster_conf() - end; + ok; _ -> + ?SLOG( + warning, + LogData#{ + msg => "cluster_config_fetch_failures", + failed_nodes => Failed, + booting_nodes => NotReady + } + ) + end, + MyRole = mria_rlog:role(), + case Ready of + [] when MyRole =:= replicant -> + %% replicant should never boot without copying from a core node + delay_and_retry(LogData#{role => replicant}); + [] -> + %% none of the nodes are ready, either delay-and-retry or boot without wait + TableStatus = tx_commit_table_status(), + sync_cluster_conf5(TableStatus, LogData); + _ -> + %% copy config from the best node in the Ready list sync_cluster_conf3(Ready) end. +%% None of the peer nodes are responsive, so we have to make a decision +%% based on the commit lagging (if the commit table is loaded). +%% +%% It could be that the peer nodes are also booting up, +%% however we cannot always wait because it may run into a dead-lock. +%% +%% Giving up wait here implies that some changes made to the peer node outside +%% of cluster-rpc MFAs will be lost. +%% e.g. stop all nodes, manually change cluster.hocon in one node +%% then boot all nodes around the same time, the changed cluster.hocon may +%% get lost if the node happen to copy config from others. +sync_cluster_conf5({loaded, local}, LogData) -> + ?SLOG(info, LogData#{ + msg => "skip_copy_cluster_config_from_peer_nodes", + explain => "Commit table loaded locally from disk, assuming that I have the latest config" + }), + {ok, ?DEFAULT_INIT_TXN_ID}; +sync_cluster_conf5({loaded, From}, LogData) -> + case get_commit_lag() of + #{my_id := MyId, latest := Latest} = Lagging when MyId >= Latest orelse Latest =:= 0 -> + ?SLOG(info, LogData#{ + msg => "skip_copy_cluster_config_from_peer_nodes", + explain => "I have the latest cluster config commit", + commit_loaded_from => From, + lagging_info => Lagging + }), + {ok, ?DEFAULT_INIT_TXN_ID}; + #{my_id := _MyId, latest := _Latest} = Lagging -> + delay_and_retry(LogData#{lagging_info => Lagging, commit_loaded_from => From}) + end; +sync_cluster_conf5({waiting, Waiting}, LogData) -> + %% this may never happen? since we waited for table before + delay_and_retry(LogData#{table_pending => Waiting}). + +get_commit_lag() -> + emqx_cluster_rpc:get_commit_lag(). + +delay_and_retry(LogData) -> + Timeout = sync_delay_timeout(), + ?SLOG(warning, LogData#{ + msg => "sync_cluster_conf_retry", + explain => + "Cannot boot alone due to potentially stale data. " + "Will try sync cluster config again after delay", + delay => Timeout + }), + timer:sleep(Timeout), + sync_cluster_conf(). + +-ifdef(TEST). +sync_delay_timeout() -> + Jitter = rand:uniform(200), + 1_000 + Jitter. +-else. +sync_delay_timeout() -> + Jitter = rand:uniform(2000), + 10_000 + Jitter. +-endif. + %% @private Filter out the nodes which are running a newer version than this node. sync_cluster_conf3(Ready) -> case lists:filter(fun is_older_or_same_version/1, Ready) of @@ -217,10 +240,10 @@ sync_cluster_conf3(Ready) -> ), ?SLOG(warning, #{ msg => "all_available_nodes_running_newer_version", - hint => - "Booting this node without syncing cluster config from peer core nodes " + explain => + "Booting this node without syncing cluster config from core nodes " "because other nodes are running a newer version", - peer_nodes => NodesAndVersions + versions => NodesAndVersions }), {ok, ?DEFAULT_INIT_TXN_ID}; Ready2 -> @@ -246,7 +269,7 @@ sync_cluster_conf4(Ready) -> [{ok, Info} | _] = lists:sort(fun conf_sort/2, Ready), #{node := Node, conf := RawOverrideConf, tnx_id := TnxId} = Info, HasDeprecatedFile = has_deprecated_file(Info), - ?SLOG(debug, #{ + ?SLOG(info, #{ msg => "sync_cluster_conf_success", synced_from_node => Node, has_deprecated_file => HasDeprecatedFile, @@ -263,19 +286,9 @@ sync_cluster_conf4(Ready) -> ok = sync_data_from_node(Node), {ok, TnxId}. -should_proceed_with_boot() -> +tx_commit_table_status() -> TablesStatus = emqx_cluster_rpc:get_tables_status(), - LocalNode = node(), - case maps:get(?CLUSTER_COMMIT, TablesStatus) of - {disc, LocalNode} -> - %% Loading locally; let this node finish its boot sequence - %% so others can copy the config from this one. - true; - _ -> - %% Loading from another node or still waiting for nodes to - %% be up. Try again. - false - end. + maps:get(?CLUSTER_COMMIT, TablesStatus). conf_sort({ok, #{tnx_id := Id1}}, {ok, #{tnx_id := Id2}}) when Id1 > Id2 -> true; conf_sort({ok, #{tnx_id := Id, wall_clock := W1}}, {ok, #{tnx_id := Id, wall_clock := W2}}) -> diff --git a/changes/ce/fix-11897.en.md b/changes/ce/fix-11897.en.md new file mode 100644 index 000000000..383129b4a --- /dev/null +++ b/changes/ce/fix-11897.en.md @@ -0,0 +1 @@ +Fix config sync wait-loop race condition when cluster nodes boot around the same time.