Merge pull request #11897 from zmstone/1107-fix-cluster-conf-sync-wait-loop
1107 fix cluster conf sync wait loop
This commit is contained in:
commit
f95058a3e7
|
@ -21,6 +21,7 @@
|
|||
|
||||
-define(CLUSTER_MFA, cluster_rpc_mfa).
|
||||
-define(CLUSTER_COMMIT, cluster_rpc_commit).
|
||||
-define(DEFAULT_INIT_TXN_ID, -1).
|
||||
|
||||
-record(cluster_rpc_mfa, {
|
||||
tnx_id :: pos_integer(),
|
||||
|
|
|
@ -44,7 +44,9 @@
|
|||
read_next_mfa/1,
|
||||
trans_query/1,
|
||||
trans_status/0,
|
||||
on_leave_clean/0
|
||||
on_leave_clean/0,
|
||||
get_commit_lag/0,
|
||||
get_commit_lag/1
|
||||
]).
|
||||
|
||||
-export([
|
||||
|
@ -231,13 +233,29 @@ make_initiate_call_req(M, F, A) ->
|
|||
-spec get_node_tnx_id(node()) -> integer().
|
||||
get_node_tnx_id(Node) ->
|
||||
case mnesia:wread({?CLUSTER_COMMIT, Node}) of
|
||||
[] -> -1;
|
||||
[] -> ?DEFAULT_INIT_TXN_ID;
|
||||
[#cluster_rpc_commit{tnx_id = TnxId}] -> TnxId
|
||||
end.
|
||||
|
||||
%% @doc Return the commit lag of *this* node.
|
||||
-spec get_commit_lag() -> #{my_id := pos_integer(), latest := pos_integer()}.
|
||||
get_commit_lag() ->
|
||||
{atomic, Result} = transaction(fun ?MODULE:get_commit_lag/1, [node()]),
|
||||
Result.
|
||||
|
||||
get_commit_lag(Node) ->
|
||||
LatestId = get_cluster_tnx_id(),
|
||||
LatestNode =
|
||||
case mnesia:read(?CLUSTER_MFA, LatestId) of
|
||||
[#?CLUSTER_MFA{initiator = N}] -> N;
|
||||
_ -> undefined
|
||||
end,
|
||||
MyId = get_node_tnx_id(Node),
|
||||
#{my_id => MyId, latest => LatestId, latest_node => LatestNode}.
|
||||
|
||||
%% Checks whether the Mnesia tables used by this module are waiting to
|
||||
%% be loaded and from where.
|
||||
-spec get_tables_status() -> #{atom() => {waiting, [node()]} | {disc | network, node()}}.
|
||||
-spec get_tables_status() -> #{atom() => {waiting, [node()]} | {loaded, local | node()}}.
|
||||
get_tables_status() ->
|
||||
maps:from_list([
|
||||
{Tab, do_get_tables_status(Tab)}
|
||||
|
@ -249,13 +267,16 @@ do_get_tables_status(Tab) ->
|
|||
TabNodes = proplists:get_value(all_nodes, Props),
|
||||
KnownDown = mnesia_recover:get_mnesia_downs(),
|
||||
LocalNode = node(),
|
||||
case proplists:get_value(load_node, Props) of
|
||||
%% load_node. Returns the name of the node that Mnesia loaded the table from.
|
||||
%% The structure of the returned value is unspecified, but can be useful for debugging purposes.
|
||||
LoadedFrom = proplists:get_value(load_node, Props),
|
||||
case LoadedFrom of
|
||||
unknown ->
|
||||
{waiting, TabNodes -- [LocalNode | KnownDown]};
|
||||
LocalNode ->
|
||||
{disc, LocalNode};
|
||||
{loaded, local};
|
||||
Node ->
|
||||
{network, Node}
|
||||
{loaded, Node}
|
||||
end.
|
||||
|
||||
%% Regardless of what MFA is returned, consider it a success),
|
||||
|
|
|
@ -26,8 +26,6 @@
|
|||
-include_lib("emqx/include/logger.hrl").
|
||||
-include("emqx_conf.hrl").
|
||||
|
||||
-define(DEFAULT_INIT_TXN_ID, -1).
|
||||
|
||||
start(_StartType, _StartArgs) ->
|
||||
try
|
||||
ok = init_conf()
|
||||
|
@ -52,31 +50,32 @@ unset_config_loaded() ->
|
|||
%% This function is named 'override' due to historical reasons.
|
||||
get_override_config_file() ->
|
||||
Node = node(),
|
||||
Data = #{
|
||||
wall_clock => erlang:statistics(wall_clock),
|
||||
node => Node,
|
||||
release => emqx_release:version_with_prefix()
|
||||
},
|
||||
case emqx_app:init_load_done() of
|
||||
false ->
|
||||
{error, #{node => Node, msg => "init_conf_load_not_done"}};
|
||||
{error, Data#{msg => "init_conf_load_not_done"}};
|
||||
true ->
|
||||
case erlang:whereis(emqx_config_handler) of
|
||||
undefined ->
|
||||
{error, #{node => Node, msg => "emqx_config_handler_not_ready"}};
|
||||
{error, Data#{msg => "emqx_config_handler_not_ready"}};
|
||||
_ ->
|
||||
Fun = fun() ->
|
||||
TnxId = emqx_cluster_rpc:get_node_tnx_id(Node),
|
||||
WallClock = erlang:statistics(wall_clock),
|
||||
Conf = emqx_config_handler:get_raw_cluster_override_conf(),
|
||||
HasDeprecateFile = emqx_config:has_deprecated_file(),
|
||||
#{
|
||||
wall_clock => WallClock,
|
||||
Data#{
|
||||
conf => Conf,
|
||||
tnx_id => TnxId,
|
||||
node => Node,
|
||||
has_deprecated_file => HasDeprecateFile,
|
||||
release => emqx_release:version_with_prefix()
|
||||
has_deprecated_file => HasDeprecateFile
|
||||
}
|
||||
end,
|
||||
case mria:ro_transaction(?CLUSTER_RPC_SHARD, Fun) of
|
||||
{atomic, Res} -> {ok, Res};
|
||||
{aborted, Reason} -> {error, #{node => Node, msg => Reason}}
|
||||
{aborted, Reason} -> {error, Data#{msg => Reason}}
|
||||
end
|
||||
end
|
||||
end.
|
||||
|
@ -105,7 +104,7 @@ init_load(TnxId) ->
|
|||
ok = emqx_app:set_config_loader(emqx_conf),
|
||||
ok;
|
||||
Module ->
|
||||
?SLOG(debug, #{
|
||||
?SLOG(info, #{
|
||||
msg => "skip_init_config_load",
|
||||
reason => "Some application has set another config loader",
|
||||
loader => Module
|
||||
|
@ -126,7 +125,7 @@ sync_cluster_conf() ->
|
|||
case cluster_nodes() of
|
||||
[] ->
|
||||
%% The first core nodes is self.
|
||||
?SLOG(debug, #{
|
||||
?SLOG(info, #{
|
||||
msg => "skip_sync_cluster_conf",
|
||||
reason => "This is a single node, or the first node in the cluster"
|
||||
}),
|
||||
|
@ -138,70 +137,94 @@ sync_cluster_conf() ->
|
|||
%% @private Some core nodes are running, try to sync the cluster config from them.
|
||||
sync_cluster_conf2(Nodes) ->
|
||||
{Results, Failed} = emqx_conf_proto_v3:get_override_config_file(Nodes),
|
||||
{Ready, NotReady0} = lists:partition(fun(Res) -> element(1, Res) =:= ok end, Results),
|
||||
NotReady = lists:filter(fun(Res) -> element(1, Res) =:= error end, NotReady0),
|
||||
case (Failed =/= [] orelse NotReady =/= []) of
|
||||
true when Ready =/= [] ->
|
||||
%% Some core nodes failed to reply.
|
||||
Warning = #{
|
||||
nodes => Nodes,
|
||||
failed => Failed,
|
||||
not_ready => NotReady,
|
||||
msg => "ignored_nodes_when_sync_cluster_conf"
|
||||
},
|
||||
?SLOG(warning, Warning);
|
||||
true when Failed =/= [] ->
|
||||
%% There are core nodes running but no one was able to reply.
|
||||
?SLOG(error, #{
|
||||
msg => "failed_to_sync_cluster_conf",
|
||||
nodes => Nodes,
|
||||
failed => Failed,
|
||||
not_ready => NotReady
|
||||
});
|
||||
true ->
|
||||
%% There are core nodes booting up
|
||||
?SLOG(info, #{
|
||||
msg => "peer_not_ready_for_config_sync",
|
||||
reason => "The 'not_ready' peer node(s) are loading configs",
|
||||
nodes => Nodes,
|
||||
not_ready => NotReady
|
||||
});
|
||||
false ->
|
||||
ok
|
||||
end,
|
||||
case Ready of
|
||||
{Ready, NotReady} = lists:partition(fun(Res) -> element(1, Res) =:= ok end, Results),
|
||||
LogData = #{peer_nodes => Nodes, self_node => node()},
|
||||
case Failed ++ NotReady of
|
||||
[] ->
|
||||
case should_proceed_with_boot() of
|
||||
true ->
|
||||
%% Act as if this node is alone, so it can
|
||||
%% finish the boot sequence and load the
|
||||
%% config for other nodes to copy it.
|
||||
?SLOG(info, #{
|
||||
msg => "skip_sync_cluster_conf",
|
||||
loading_from_disk => true,
|
||||
nodes => Nodes,
|
||||
failed => Failed,
|
||||
not_ready => NotReady
|
||||
}),
|
||||
{ok, ?DEFAULT_INIT_TXN_ID};
|
||||
false ->
|
||||
%% retry in some time
|
||||
Jitter = rand:uniform(2000),
|
||||
Timeout = 10000 + Jitter,
|
||||
timer:sleep(Timeout),
|
||||
?SLOG(warning, #{
|
||||
msg => "sync_cluster_conf_retry",
|
||||
timeout => Timeout,
|
||||
nodes => Nodes,
|
||||
failed => Failed,
|
||||
not_ready => NotReady
|
||||
}),
|
||||
sync_cluster_conf()
|
||||
end;
|
||||
ok;
|
||||
_ ->
|
||||
?SLOG(
|
||||
warning,
|
||||
LogData#{
|
||||
msg => "cluster_config_fetch_failures",
|
||||
failed_nodes => Failed,
|
||||
booting_nodes => NotReady
|
||||
}
|
||||
)
|
||||
end,
|
||||
MyRole = mria_rlog:role(),
|
||||
case Ready of
|
||||
[] when MyRole =:= replicant ->
|
||||
%% replicant should never boot without copying from a core node
|
||||
delay_and_retry(LogData#{role => replicant});
|
||||
[] ->
|
||||
%% none of the nodes are ready, either delay-and-retry or boot without wait
|
||||
TableStatus = tx_commit_table_status(),
|
||||
sync_cluster_conf5(TableStatus, LogData);
|
||||
_ ->
|
||||
%% copy config from the best node in the Ready list
|
||||
sync_cluster_conf3(Ready)
|
||||
end.
|
||||
|
||||
%% None of the peer nodes are responsive, so we have to make a decision
|
||||
%% based on the commit lagging (if the commit table is loaded).
|
||||
%%
|
||||
%% It could be that the peer nodes are also booting up,
|
||||
%% however we cannot always wait because it may run into a dead-lock.
|
||||
%%
|
||||
%% Giving up wait here implies that some changes made to the peer node outside
|
||||
%% of cluster-rpc MFAs will be lost.
|
||||
%% e.g. stop all nodes, manually change cluster.hocon in one node
|
||||
%% then boot all nodes around the same time, the changed cluster.hocon may
|
||||
%% get lost if the node happen to copy config from others.
|
||||
sync_cluster_conf5({loaded, local}, LogData) ->
|
||||
?SLOG(info, LogData#{
|
||||
msg => "skip_copy_cluster_config_from_peer_nodes",
|
||||
explain => "Commit table loaded locally from disk, assuming that I have the latest config"
|
||||
}),
|
||||
{ok, ?DEFAULT_INIT_TXN_ID};
|
||||
sync_cluster_conf5({loaded, From}, LogData) ->
|
||||
case get_commit_lag() of
|
||||
#{my_id := MyId, latest := Latest} = Lagging when MyId >= Latest orelse Latest =:= 0 ->
|
||||
?SLOG(info, LogData#{
|
||||
msg => "skip_copy_cluster_config_from_peer_nodes",
|
||||
explain => "I have the latest cluster config commit",
|
||||
commit_loaded_from => From,
|
||||
lagging_info => Lagging
|
||||
}),
|
||||
{ok, ?DEFAULT_INIT_TXN_ID};
|
||||
#{my_id := _MyId, latest := _Latest} = Lagging ->
|
||||
delay_and_retry(LogData#{lagging_info => Lagging, commit_loaded_from => From})
|
||||
end;
|
||||
sync_cluster_conf5({waiting, Waiting}, LogData) ->
|
||||
%% this may never happen? since we waited for table before
|
||||
delay_and_retry(LogData#{table_pending => Waiting}).
|
||||
|
||||
get_commit_lag() ->
|
||||
emqx_cluster_rpc:get_commit_lag().
|
||||
|
||||
delay_and_retry(LogData) ->
|
||||
Timeout = sync_delay_timeout(),
|
||||
?SLOG(warning, LogData#{
|
||||
msg => "sync_cluster_conf_retry",
|
||||
explain =>
|
||||
"Cannot boot alone due to potentially stale data. "
|
||||
"Will try sync cluster config again after delay",
|
||||
delay => Timeout
|
||||
}),
|
||||
timer:sleep(Timeout),
|
||||
sync_cluster_conf().
|
||||
|
||||
-ifdef(TEST).
|
||||
sync_delay_timeout() ->
|
||||
Jitter = rand:uniform(200),
|
||||
1_000 + Jitter.
|
||||
-else.
|
||||
sync_delay_timeout() ->
|
||||
Jitter = rand:uniform(2000),
|
||||
10_000 + Jitter.
|
||||
-endif.
|
||||
|
||||
%% @private Filter out the nodes which are running a newer version than this node.
|
||||
sync_cluster_conf3(Ready) ->
|
||||
case lists:filter(fun is_older_or_same_version/1, Ready) of
|
||||
|
@ -217,10 +240,10 @@ sync_cluster_conf3(Ready) ->
|
|||
),
|
||||
?SLOG(warning, #{
|
||||
msg => "all_available_nodes_running_newer_version",
|
||||
hint =>
|
||||
"Booting this node without syncing cluster config from peer core nodes "
|
||||
explain =>
|
||||
"Booting this node without syncing cluster config from core nodes "
|
||||
"because other nodes are running a newer version",
|
||||
peer_nodes => NodesAndVersions
|
||||
versions => NodesAndVersions
|
||||
}),
|
||||
{ok, ?DEFAULT_INIT_TXN_ID};
|
||||
Ready2 ->
|
||||
|
@ -246,7 +269,7 @@ sync_cluster_conf4(Ready) ->
|
|||
[{ok, Info} | _] = lists:sort(fun conf_sort/2, Ready),
|
||||
#{node := Node, conf := RawOverrideConf, tnx_id := TnxId} = Info,
|
||||
HasDeprecatedFile = has_deprecated_file(Info),
|
||||
?SLOG(debug, #{
|
||||
?SLOG(info, #{
|
||||
msg => "sync_cluster_conf_success",
|
||||
synced_from_node => Node,
|
||||
has_deprecated_file => HasDeprecatedFile,
|
||||
|
@ -263,19 +286,9 @@ sync_cluster_conf4(Ready) ->
|
|||
ok = sync_data_from_node(Node),
|
||||
{ok, TnxId}.
|
||||
|
||||
should_proceed_with_boot() ->
|
||||
tx_commit_table_status() ->
|
||||
TablesStatus = emqx_cluster_rpc:get_tables_status(),
|
||||
LocalNode = node(),
|
||||
case maps:get(?CLUSTER_COMMIT, TablesStatus) of
|
||||
{disc, LocalNode} ->
|
||||
%% Loading locally; let this node finish its boot sequence
|
||||
%% so others can copy the config from this one.
|
||||
true;
|
||||
_ ->
|
||||
%% Loading from another node or still waiting for nodes to
|
||||
%% be up. Try again.
|
||||
false
|
||||
end.
|
||||
maps:get(?CLUSTER_COMMIT, TablesStatus).
|
||||
|
||||
conf_sort({ok, #{tnx_id := Id1}}, {ok, #{tnx_id := Id2}}) when Id1 > Id2 -> true;
|
||||
conf_sort({ok, #{tnx_id := Id, wall_clock := W1}}, {ok, #{tnx_id := Id, wall_clock := W2}}) ->
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
Fix config sync wait-loop race condition when cluster nodes boot around the same time.
|
Loading…
Reference in New Issue