328 lines
12 KiB
Erlang
328 lines
12 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_conf_app).
|
|
|
|
-behaviour(application).
|
|
|
|
-export([start/2, stop/1]).
|
|
-export([get_override_config_file/0]).
|
|
-export([sync_data_from_node/0]).
|
|
-export([unset_config_loaded/0]).
|
|
|
|
-include_lib("emqx/include/logger.hrl").
|
|
-include("emqx_conf.hrl").
|
|
|
|
start(_StartType, _StartArgs) ->
|
|
ok = mria:wait_for_tables(emqx_cluster_rpc:create_tables()),
|
|
try
|
|
ok = init_conf()
|
|
catch
|
|
C:E:St ->
|
|
%% logger is not quite ready.
|
|
io:format(standard_error, "Failed to load config~n~p~n~p~n~p~n", [C, E, St]),
|
|
init:stop(1)
|
|
end,
|
|
ok = emqx_config_logger:refresh_config(),
|
|
emqx_conf_sup:start_link().
|
|
|
|
stop(_State) ->
|
|
ok.
|
|
|
|
%% @doc emqx_conf relies on this flag to synchronize configuration between nodes.
|
|
%% Therefore, we must clean up this flag when emqx application is restarted by mria.
|
|
unset_config_loaded() ->
|
|
emqx_app:unset_config_loaded().
|
|
|
|
%% Read the cluster config from the local node.
|
|
%% This function is named 'override' due to historical reasons.
|
|
get_override_config_file() ->
|
|
Node = node(),
|
|
Data = #{
|
|
wall_clock => erlang:statistics(wall_clock),
|
|
node => Node,
|
|
release => emqx_release:version_with_prefix()
|
|
},
|
|
case emqx_app:init_load_done() of
|
|
false ->
|
|
{error, Data#{msg => "init_conf_load_not_done"}};
|
|
true ->
|
|
case erlang:whereis(emqx_config_handler) of
|
|
undefined ->
|
|
{error, Data#{msg => "emqx_config_handler_not_ready"}};
|
|
_ ->
|
|
Fun = fun() ->
|
|
TnxId = emqx_cluster_rpc:get_node_tnx_id(Node),
|
|
Conf = emqx_config_handler:get_raw_cluster_override_conf(),
|
|
HasDeprecateFile = emqx_config:has_deprecated_file(),
|
|
Data#{
|
|
conf => Conf,
|
|
tnx_id => TnxId,
|
|
has_deprecated_file => HasDeprecateFile
|
|
}
|
|
end,
|
|
case mria:ro_transaction(?CLUSTER_RPC_SHARD, Fun) of
|
|
{atomic, Res} -> {ok, Res};
|
|
{aborted, Reason} -> {error, Data#{msg => Reason}}
|
|
end
|
|
end
|
|
end.
|
|
|
|
sync_data_from_node() ->
|
|
Dir = emqx:data_dir(),
|
|
TargetDirs = lists:filter(fun(Type) -> filelib:is_dir(filename:join(Dir, Type)) end, [
|
|
"authz", "certs"
|
|
]),
|
|
Name = "data.zip",
|
|
case zip:zip(Name, TargetDirs, [memory, {cwd, Dir}]) of
|
|
{ok, {Name, Bin}} -> {ok, Bin};
|
|
{error, Reason} -> {error, Reason}
|
|
end.
|
|
|
|
%% ------------------------------------------------------------------------------
|
|
%% Internal functions
|
|
%% ------------------------------------------------------------------------------
|
|
|
|
init_load(TnxId) ->
|
|
case emqx_app:get_config_loader() of
|
|
Module when Module == emqx; Module == emqx_conf ->
|
|
ok = emqx_config:init_load(emqx_conf:schema_module()),
|
|
%% Set load config done after update(init) tnx_id.
|
|
ok = emqx_cluster_rpc:maybe_init_tnx_id(node(), TnxId),
|
|
ok = emqx_app:set_config_loader(emqx_conf),
|
|
ok;
|
|
Module ->
|
|
?SLOG(info, #{
|
|
msg => "skip_init_config_load",
|
|
reason => "Some application has set another config loader",
|
|
loader => Module
|
|
})
|
|
end.
|
|
|
|
init_conf() ->
|
|
emqx_cluster_rpc:wait_for_cluster_rpc(),
|
|
{ok, TnxId} = sync_cluster_conf(),
|
|
ok = init_load(TnxId),
|
|
ok.
|
|
|
|
cluster_nodes() ->
|
|
mria:cluster_nodes(cores) -- [node()].
|
|
|
|
%% @doc Try to sync the cluster config from other core nodes.
|
|
sync_cluster_conf() ->
|
|
case cluster_nodes() of
|
|
[] ->
|
|
%% The first core nodes is self.
|
|
?SLOG(info, #{
|
|
msg => "skip_sync_cluster_conf",
|
|
reason => "This is a single node, or the first node in the cluster"
|
|
}),
|
|
{ok, ?DEFAULT_INIT_TXN_ID};
|
|
Nodes ->
|
|
sync_cluster_conf2(Nodes)
|
|
end.
|
|
|
|
%% @private Some core nodes are running, try to sync the cluster config from them.
|
|
sync_cluster_conf2(Nodes) ->
|
|
{Results, Failed} = emqx_conf_proto_v3:get_override_config_file(Nodes),
|
|
{Ready, NotReady} = lists:partition(fun(Res) -> element(1, Res) =:= ok end, Results),
|
|
LogData = #{peer_nodes => Nodes, self_node => node()},
|
|
case Failed ++ NotReady of
|
|
[] ->
|
|
ok;
|
|
_ ->
|
|
?SLOG(
|
|
warning,
|
|
LogData#{
|
|
msg => "cluster_config_fetch_failures",
|
|
failed_nodes => Failed,
|
|
booting_nodes => NotReady
|
|
}
|
|
)
|
|
end,
|
|
MyRole = mria_rlog:role(),
|
|
case Ready of
|
|
[] when MyRole =:= replicant ->
|
|
%% replicant should never boot without copying from a core node
|
|
delay_and_retry(LogData#{role => replicant});
|
|
[] ->
|
|
%% none of the nodes are ready, either delay-and-retry or boot without wait
|
|
TableStatus = tx_commit_table_status(),
|
|
sync_cluster_conf5(TableStatus, LogData);
|
|
_ ->
|
|
%% copy config from the best node in the Ready list
|
|
sync_cluster_conf3(Ready)
|
|
end.
|
|
|
|
%% None of the peer nodes are responsive, so we have to make a decision
|
|
%% based on the commit lagging (if the commit table is loaded).
|
|
%%
|
|
%% It could be that the peer nodes are also booting up,
|
|
%% however we cannot always wait because it may run into a dead-lock.
|
|
%%
|
|
%% Giving up wait here implies that some changes made to the peer node outside
|
|
%% of cluster-rpc MFAs will be lost.
|
|
%% e.g. stop all nodes, manually change cluster.hocon in one node
|
|
%% then boot all nodes around the same time, the changed cluster.hocon may
|
|
%% get lost if the node happen to copy config from others.
|
|
sync_cluster_conf5({loaded, local}, LogData) ->
|
|
?SLOG(info, LogData#{
|
|
msg => "skip_copy_cluster_config_from_peer_nodes",
|
|
explain => "Commit table loaded locally from disk, assuming that I have the latest config"
|
|
}),
|
|
{ok, ?DEFAULT_INIT_TXN_ID};
|
|
sync_cluster_conf5({loaded, From}, LogData) ->
|
|
case get_commit_lag() of
|
|
#{my_id := MyId, latest := Latest} = Lagging when MyId >= Latest orelse Latest =:= 0 ->
|
|
?SLOG(info, LogData#{
|
|
msg => "skip_copy_cluster_config_from_peer_nodes",
|
|
explain => "I have the latest cluster config commit",
|
|
commit_loaded_from => From,
|
|
lagging_info => Lagging
|
|
}),
|
|
{ok, ?DEFAULT_INIT_TXN_ID};
|
|
#{my_id := _MyId, latest := _Latest} = Lagging ->
|
|
delay_and_retry(LogData#{lagging_info => Lagging, commit_loaded_from => From})
|
|
end;
|
|
sync_cluster_conf5({waiting, Waiting}, LogData) ->
|
|
%% this may never happen? since we waited for table before
|
|
delay_and_retry(LogData#{table_pending => Waiting}).
|
|
|
|
get_commit_lag() ->
|
|
emqx_cluster_rpc:get_commit_lag().
|
|
|
|
delay_and_retry(LogData) ->
|
|
Timeout = sync_delay_timeout(),
|
|
?SLOG(warning, LogData#{
|
|
msg => "sync_cluster_conf_retry",
|
|
explain =>
|
|
"Cannot boot alone due to potentially stale data. "
|
|
"Will try sync cluster config again after delay",
|
|
delay => Timeout
|
|
}),
|
|
timer:sleep(Timeout),
|
|
sync_cluster_conf().
|
|
|
|
-ifdef(TEST).
|
|
sync_delay_timeout() ->
|
|
Jitter = rand:uniform(200),
|
|
1_000 + Jitter.
|
|
-else.
|
|
sync_delay_timeout() ->
|
|
Jitter = rand:uniform(2000),
|
|
10_000 + Jitter.
|
|
-endif.
|
|
|
|
%% @private Filter out the nodes which are running a newer version than this node.
|
|
sync_cluster_conf3(Ready) ->
|
|
case lists:filter(fun is_older_or_same_version/1, Ready) of
|
|
[] ->
|
|
%% All available core nodes are running a newer version than this node.
|
|
%% Start this node without syncing cluster config from them.
|
|
%% This is likely a restart of an older version node during cluster upgrade.
|
|
NodesAndVersions = lists:map(
|
|
fun({ok, #{node := Node, release := Release}}) ->
|
|
#{node => Node, version => Release}
|
|
end,
|
|
Ready
|
|
),
|
|
?SLOG(warning, #{
|
|
msg => "all_available_nodes_running_newer_version",
|
|
explain =>
|
|
"Booting this node without syncing cluster config from core nodes "
|
|
"because other nodes are running a newer version",
|
|
versions => NodesAndVersions
|
|
}),
|
|
{ok, ?DEFAULT_INIT_TXN_ID};
|
|
Ready2 ->
|
|
sync_cluster_conf4(Ready2)
|
|
end.
|
|
|
|
is_older_or_same_version({ok, #{release := RemoteRelease}}) ->
|
|
try
|
|
emqx_release:vsn_compare(RemoteRelease) =/= newer
|
|
catch
|
|
_:_ ->
|
|
%% If the version is not valid (without v or e prefix),
|
|
%% we know it's older than v5.1.0/e5.1.0
|
|
true
|
|
end;
|
|
is_older_or_same_version(_) ->
|
|
%% older version has no 'release' field
|
|
true.
|
|
|
|
%% @private Some core nodes are running and replied with their configs successfully.
|
|
%% Try to sort the results and save the first one for local use.
|
|
sync_cluster_conf4(Ready) ->
|
|
[{ok, Info} | _] = lists:sort(fun conf_sort/2, Ready),
|
|
#{node := Node, conf := RawOverrideConf, tnx_id := TnxId} = Info,
|
|
HasDeprecatedFile = has_deprecated_file(Info),
|
|
?SLOG(info, #{
|
|
msg => "sync_cluster_conf_success",
|
|
synced_from_node => Node,
|
|
has_deprecated_file => HasDeprecatedFile,
|
|
local_release => emqx_release:version_with_prefix(),
|
|
remote_release => maps:get(release, Info, "before_v5.0.24|e5.0.3"),
|
|
data_dir => emqx:data_dir(),
|
|
tnx_id => TnxId
|
|
}),
|
|
ok = emqx_config:save_to_override_conf(
|
|
HasDeprecatedFile,
|
|
RawOverrideConf,
|
|
#{override_to => cluster}
|
|
),
|
|
ok = sync_data_from_node(Node),
|
|
{ok, TnxId}.
|
|
|
|
tx_commit_table_status() ->
|
|
TablesStatus = emqx_cluster_rpc:get_tables_status(),
|
|
maps:get(?CLUSTER_COMMIT, TablesStatus).
|
|
|
|
conf_sort({ok, #{tnx_id := Id1}}, {ok, #{tnx_id := Id2}}) when Id1 > Id2 -> true;
|
|
conf_sort({ok, #{tnx_id := Id, wall_clock := W1}}, {ok, #{tnx_id := Id, wall_clock := W2}}) ->
|
|
W1 > W2;
|
|
conf_sort({ok, _}, {ok, _}) ->
|
|
false.
|
|
|
|
sync_data_from_node(Node) ->
|
|
case emqx_conf_proto_v3:sync_data_from_node(Node) of
|
|
{ok, DataBin} ->
|
|
case zip:unzip(DataBin, [{cwd, emqx:data_dir()}]) of
|
|
{ok, []} ->
|
|
?SLOG(debug, #{node => Node, msg => "sync_data_from_node_empty_response"});
|
|
{ok, Files} ->
|
|
?SLOG(debug, #{
|
|
node => Node,
|
|
msg => "sync_data_from_node_non_empty_response",
|
|
files => Files
|
|
})
|
|
end,
|
|
ok;
|
|
Error ->
|
|
?SLOG(emergency, #{node => Node, msg => "sync_data_from_node_failed", reason => Error}),
|
|
error(Error)
|
|
end.
|
|
|
|
has_deprecated_file(#{conf := Conf} = Info) ->
|
|
case maps:find(has_deprecated_file, Info) of
|
|
{ok, HasDeprecatedFile} ->
|
|
HasDeprecatedFile;
|
|
error ->
|
|
%% The old version don't have emqx_config:has_deprecated_file/0
|
|
%% Conf is not empty if deprecated file is found.
|
|
Conf =/= #{}
|
|
end.
|