fix(cluster-rpc): boot from local config if table loaded

When EMQX boots up, it tries to get latest config from peer (core type)
nodes, if none of the nodes are replying, the node will decide
to boot with local config (and replay the committed changes) if
the commit table is loaded from disk locally (an indication of the
data being latest), otherwise it will sleep for 1-2 seconds and
retry.

This lead to a race condition, e.g. in a two nodes cluster:

1. node1 boots up
2. node2 boots up and copy mnesia table from node1
3. node1 restart before node2 can sync cluster.hocon from it
4. node1 boots up and copy mnesia table from node2

Now that both node1 and node2 has the mnesia `load_node` pointing
to each other (i.e. not a local disk load).

Prior to this fix, the nodes would wait for each other in a dead loop.

This commit fixes the issue by allowing node to boot
with local config if it does not have a lagging.
This commit is contained in:
Zaiming (Stone) Shi 2023-11-07 17:06:08 +01:00
parent 917809205b
commit f9e9748cec
4 changed files with 130 additions and 94 deletions

View File

@ -21,6 +21,7 @@
-define(CLUSTER_MFA, cluster_rpc_mfa).
-define(CLUSTER_COMMIT, cluster_rpc_commit).
-define(DEFAULT_INIT_TXN_ID, -1).
-record(cluster_rpc_mfa, {
tnx_id :: pos_integer(),

View File

@ -44,7 +44,9 @@
read_next_mfa/1,
trans_query/1,
trans_status/0,
on_leave_clean/0
on_leave_clean/0,
get_commit_lag/0,
get_commit_lag/1
]).
-export([
@ -231,13 +233,29 @@ make_initiate_call_req(M, F, A) ->
-spec get_node_tnx_id(node()) -> integer().
get_node_tnx_id(Node) ->
case mnesia:wread({?CLUSTER_COMMIT, Node}) of
[] -> -1;
[] -> ?DEFAULT_INIT_TXN_ID;
[#cluster_rpc_commit{tnx_id = TnxId}] -> TnxId
end.
%% @doc Return the commit lag of *this* node.
-spec get_commit_lag() -> #{my_id := pos_integer(), latest := pos_integer()}.
get_commit_lag() ->
{atomic, Result} = transaction(fun ?MODULE:get_commit_lag/1, [node()]),
Result.
get_commit_lag(Node) ->
LatestId = get_cluster_tnx_id(),
LatestNode =
case mnesia:read(?CLUSTER_MFA, LatestId) of
[#?CLUSTER_MFA{initiator = N}] -> N;
_ -> undefined
end,
MyId = get_node_tnx_id(Node),
#{my_id => MyId, latest => LatestId, latest_node => LatestNode}.
%% Checks whether the Mnesia tables used by this module are waiting to
%% be loaded and from where.
-spec get_tables_status() -> #{atom() => {waiting, [node()]} | {disc | network, node()}}.
-spec get_tables_status() -> #{atom() => {waiting, [node()]} | {loaded, local | node()}}.
get_tables_status() ->
maps:from_list([
{Tab, do_get_tables_status(Tab)}
@ -249,13 +267,16 @@ do_get_tables_status(Tab) ->
TabNodes = proplists:get_value(all_nodes, Props),
KnownDown = mnesia_recover:get_mnesia_downs(),
LocalNode = node(),
case proplists:get_value(load_node, Props) of
%% load_node. Returns the name of the node that Mnesia loaded the table from.
%% The structure of the returned value is unspecified, but can be useful for debugging purposes.
LoadedFrom = proplists:get_value(load_node, Props),
case LoadedFrom of
unknown ->
{waiting, TabNodes -- [LocalNode | KnownDown]};
LocalNode ->
{disc, LocalNode};
{loaded, local};
Node ->
{network, Node}
{loaded, Node}
end.
%% Regardless of what MFA is returned, consider it a success),

View File

@ -26,8 +26,6 @@
-include_lib("emqx/include/logger.hrl").
-include("emqx_conf.hrl").
-define(DEFAULT_INIT_TXN_ID, -1).
start(_StartType, _StartArgs) ->
try
ok = init_conf()
@ -52,31 +50,32 @@ unset_config_loaded() ->
%% This function is named 'override' due to historical reasons.
get_override_config_file() ->
Node = node(),
Data = #{
wall_clock => erlang:statistics(wall_clock),
node => Node,
release => emqx_release:version_with_prefix()
},
case emqx_app:init_load_done() of
false ->
{error, #{node => Node, msg => "init_conf_load_not_done"}};
{error, Data#{msg => "init_conf_load_not_done"}};
true ->
case erlang:whereis(emqx_config_handler) of
undefined ->
{error, #{node => Node, msg => "emqx_config_handler_not_ready"}};
{error, #{msg => "emqx_config_handler_not_ready"}};
_ ->
Fun = fun() ->
TnxId = emqx_cluster_rpc:get_node_tnx_id(Node),
WallClock = erlang:statistics(wall_clock),
Conf = emqx_config_handler:get_raw_cluster_override_conf(),
HasDeprecateFile = emqx_config:has_deprecated_file(),
#{
wall_clock => WallClock,
Data#{
conf => Conf,
tnx_id => TnxId,
node => Node,
has_deprecated_file => HasDeprecateFile,
release => emqx_release:version_with_prefix()
has_deprecated_file => HasDeprecateFile
}
end,
case mria:ro_transaction(?CLUSTER_RPC_SHARD, Fun) of
{atomic, Res} -> {ok, Res};
{aborted, Reason} -> {error, #{node => Node, msg => Reason}}
{aborted, Reason} -> {error, Data#{msg => Reason}}
end
end
end.
@ -105,7 +104,7 @@ init_load(TnxId) ->
ok = emqx_app:set_config_loader(emqx_conf),
ok;
Module ->
?SLOG(debug, #{
?SLOG(info, #{
msg => "skip_init_config_load",
reason => "Some application has set another config loader",
loader => Module
@ -126,7 +125,7 @@ sync_cluster_conf() ->
case cluster_nodes() of
[] ->
%% The first core nodes is self.
?SLOG(debug, #{
?SLOG(info, #{
msg => "skip_sync_cluster_conf",
reason => "This is a single node, or the first node in the cluster"
}),
@ -138,70 +137,94 @@ sync_cluster_conf() ->
%% @private Some core nodes are running, try to sync the cluster config from them.
sync_cluster_conf2(Nodes) ->
{Results, Failed} = emqx_conf_proto_v3:get_override_config_file(Nodes),
{Ready, NotReady0} = lists:partition(fun(Res) -> element(1, Res) =:= ok end, Results),
NotReady = lists:filter(fun(Res) -> element(1, Res) =:= error end, NotReady0),
case (Failed =/= [] orelse NotReady =/= []) of
true when Ready =/= [] ->
%% Some core nodes failed to reply.
Warning = #{
nodes => Nodes,
failed => Failed,
not_ready => NotReady,
msg => "ignored_nodes_when_sync_cluster_conf"
},
?SLOG(warning, Warning);
true when Failed =/= [] ->
%% There are core nodes running but no one was able to reply.
?SLOG(error, #{
msg => "failed_to_sync_cluster_conf",
nodes => Nodes,
failed => Failed,
not_ready => NotReady
});
true ->
%% There are core nodes booting up
?SLOG(info, #{
msg => "peer_not_ready_for_config_sync",
reason => "The 'not_ready' peer node(s) are loading configs",
nodes => Nodes,
not_ready => NotReady
});
false ->
ok
end,
case Ready of
{Ready, NotReady} = lists:partition(fun(Res) -> element(1, Res) =:= ok end, Results),
LogData = #{peer_nodes => Nodes, self_node => node()},
case Failed ++ NotReady of
[] ->
case should_proceed_with_boot() of
true ->
%% Act as if this node is alone, so it can
%% finish the boot sequence and load the
%% config for other nodes to copy it.
?SLOG(info, #{
msg => "skip_sync_cluster_conf",
loading_from_disk => true,
nodes => Nodes,
failed => Failed,
not_ready => NotReady
}),
{ok, ?DEFAULT_INIT_TXN_ID};
false ->
%% retry in some time
Jitter = rand:uniform(2000),
Timeout = 10000 + Jitter,
timer:sleep(Timeout),
?SLOG(warning, #{
msg => "sync_cluster_conf_retry",
timeout => Timeout,
nodes => Nodes,
failed => Failed,
not_ready => NotReady
}),
sync_cluster_conf()
end;
ok;
_ ->
?SLOG(
warning,
LogData#{
msg => "cluster_config_fetch_failures",
failed_nodes => Failed,
booting_nodes => NotReady
}
)
end,
MyRole = mria_rlog:role(),
case Ready of
[] when MyRole =:= replicant ->
%% replicant should never boot without copying from a core node
delay_and_retry(LogData#{role => replicant});
[] ->
%% none of the nodes are ready, either delay-and-retry or boot without wait
TableStatus = tx_commit_table_status(),
sync_cluster_conf5(TableStatus, LogData);
_ ->
%% copy config from the best node in the Ready list
sync_cluster_conf3(Ready)
end.
%% None of the peer nodes are responsive, so we have to make a decision
%% based on the commit lagging (if the commit table is loaded).
%%
%% It could be that the peer nodes are also booting up,
%% however we cannot always wait because it may run into a dead-lock.
%%
%% Giving up wait here implies that some changes made to the peer node outside
%% of cluster-rpc MFAs will be lost.
%% e.g. stop all nodes, manually change cluster.hocon in one node
%% then boot all nodes around the same time, the changed cluster.hocon may
%% get lost if the node happen to copy config from others.
sync_cluster_conf5({loaded, local}, LogData) ->
?SLOG(info, LogData#{
msg => "skip_copy_cluster_config_from_peer_nodes",
explain => "Commit table loaded locally from disk, assuming that I have the latest config"
}),
{ok, ?DEFAULT_INIT_TXN_ID};
sync_cluster_conf5({loaded, From}, LogData) ->
case get_commit_lag() of
#{my_id := MyId, latest := Latest} = Lagging when MyId >= Latest orelse Latest =:= 0 ->
?SLOG(info, LogData#{
msg => "skip_copy_cluster_config_from_peer_nodes",
explain => "I have the latest cluster config commit",
commit_loaded_from => From,
lagging_info => Lagging
}),
{ok, ?DEFAULT_INIT_TXN_ID};
#{my_id := _MyId, latest := _Latest} = Lagging ->
delay_and_retry(LogData#{lagging_info => Lagging, commit_loaded_from => From})
end;
sync_cluster_conf5({waiting, Waiting}, LogData) ->
%% this may never happen? since we waited for table before
delay_and_retry(LogData#{table_pending => Waiting}).
get_commit_lag() ->
emqx_cluster_rpc:get_commit_lag().
delay_and_retry(LogData) ->
Timeout = sync_delay_timeout(),
?SLOG(warning, LogData#{
msg => "sync_cluster_conf_retry",
explain =>
"Cannot boot alone due to potentially stale data. "
"Will try sync cluster config again after delay",
delay => Timeout
}),
timer:sleep(Timeout),
sync_cluster_conf().
-ifdef(TEST).
sync_delay_timeout() ->
Jitter = rand:uniform(200),
1_000 + Jitter.
-else.
sync_delay_timeout() ->
Jitter = rand:uniform(2000),
10_000 + Jitter.
-endif.
%% @private Filter out the nodes which are running a newer version than this node.
sync_cluster_conf3(Ready) ->
case lists:filter(fun is_older_or_same_version/1, Ready) of
@ -217,10 +240,10 @@ sync_cluster_conf3(Ready) ->
),
?SLOG(warning, #{
msg => "all_available_nodes_running_newer_version",
hint =>
"Booting this node without syncing cluster config from peer core nodes "
explain =>
"Booting this node without syncing cluster config from core nodes "
"because other nodes are running a newer version",
peer_nodes => NodesAndVersions
versions => NodesAndVersions
}),
{ok, ?DEFAULT_INIT_TXN_ID};
Ready2 ->
@ -246,7 +269,7 @@ sync_cluster_conf4(Ready) ->
[{ok, Info} | _] = lists:sort(fun conf_sort/2, Ready),
#{node := Node, conf := RawOverrideConf, tnx_id := TnxId} = Info,
HasDeprecatedFile = has_deprecated_file(Info),
?SLOG(debug, #{
?SLOG(info, #{
msg => "sync_cluster_conf_success",
synced_from_node => Node,
has_deprecated_file => HasDeprecatedFile,
@ -263,19 +286,9 @@ sync_cluster_conf4(Ready) ->
ok = sync_data_from_node(Node),
{ok, TnxId}.
should_proceed_with_boot() ->
tx_commit_table_status() ->
TablesStatus = emqx_cluster_rpc:get_tables_status(),
LocalNode = node(),
case maps:get(?CLUSTER_COMMIT, TablesStatus) of
{disc, LocalNode} ->
%% Loading locally; let this node finish its boot sequence
%% so others can copy the config from this one.
true;
_ ->
%% Loading from another node or still waiting for nodes to
%% be up. Try again.
false
end.
maps:get(?CLUSTER_COMMIT, TablesStatus).
conf_sort({ok, #{tnx_id := Id1}}, {ok, #{tnx_id := Id2}}) when Id1 > Id2 -> true;
conf_sort({ok, #{tnx_id := Id, wall_clock := W1}}, {ok, #{tnx_id := Id, wall_clock := W2}}) ->

View File

@ -0,0 +1 @@
Fix config sync wait-loop race condition when cluster nodes boot around the same time.