emqx/apps/emqx_machine/src/emqx_machine.erl

256 lines
7.6 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_machine).
-export([
start/0,
graceful_shutdown/0,
brutal_shutdown/0,
is_ready/0,
node_status/0,
update_vips/0
]).
-export([open_ports_check/0]).
-export([mria_lb_custom_info/0, mria_lb_custom_info_check/1]).
-ifdef(TEST).
-export([create_plan/0]).
-endif.
-include_lib("kernel/include/inet.hrl").
-include_lib("emqx/include/logger.hrl").
%% @doc EMQX boot entrypoint.
start() ->
emqx_mgmt_cli:load(),
case os:type() of
{win32, nt} ->
ok;
_Nix ->
os:set_signal(sighup, ignore),
%% default is handle
os:set_signal(sigterm, handle)
end,
ok = set_backtrace_depth(),
ok = start_sysmon(),
configure_shard_transports(),
set_mnesia_extra_diagnostic_checks(),
emqx_otel_app:configure_otel_deps(),
%% Register mria callbacks that help to check compatibility of the
%% replicant with the core node. Currently they rely on the exact
%% match of the version of EMQX OTP application:
_ = application:load(mria),
_ = application:load(emqx),
mria_config:register_callback(lb_custom_info, fun ?MODULE:mria_lb_custom_info/0),
mria_config:register_callback(lb_custom_info_check, fun ?MODULE:mria_lb_custom_info_check/1),
ekka:start(),
ok.
graceful_shutdown() ->
emqx_machine_terminator:graceful_wait().
%% only used when failed to boot
brutal_shutdown() ->
init:stop().
set_backtrace_depth() ->
{ok, Depth} = application:get_env(emqx_machine, backtrace_depth),
_ = erlang:system_flag(backtrace_depth, Depth),
ok.
%% @doc Return true if boot is complete.
is_ready() ->
emqx_machine_terminator:is_running().
start_sysmon() ->
_ = application:load(system_monitor),
application:set_env(system_monitor, node_status_fun, {?MODULE, node_status}),
application:set_env(system_monitor, status_checks, [{?MODULE, update_vips, false, 10}]),
case application:get_env(system_monitor, db_hostname) of
{ok, [_ | _]} ->
application:set_env(system_monitor, callback_mod, system_monitor_pg),
_ = application:ensure_all_started(system_monitor, temporary),
ok;
_ ->
%% If there is no sink for the events, there is no reason
%% to run system_monitor_top, ignore start
ok
end.
node_status() ->
emqx_utils_json:encode(#{
backend => mria_rlog:backend(),
role => mria_rlog:role()
}).
update_vips() ->
system_monitor:add_vip(mria_status:shards_up()).
configure_shard_transports() ->
ShardTransports = application:get_env(emqx_machine, custom_shard_transports, #{}),
lists:foreach(
fun({ShardBin, Transport}) ->
ShardName = binary_to_existing_atom(ShardBin),
mria_config:set_shard_transport(ShardName, Transport)
end,
maps:to_list(ShardTransports)
).
set_mnesia_extra_diagnostic_checks() ->
Checks = [{check_open_ports, ok, fun ?MODULE:open_ports_check/0}],
mria_config:set_extra_mnesia_diagnostic_checks(Checks),
ok.
-define(PORT_PROBE_TIMEOUT, 10_000).
open_ports_check() ->
Plan = create_plan(),
%% 2 ports to check: ekka/epmd and gen_rpc
Timeout = 2 * ?PORT_PROBE_TIMEOUT + 5_000,
try emqx_utils:pmap(fun do_check/1, Plan, Timeout) of
Results ->
verify_results(Results)
catch
Kind:Reason:Stacktrace ->
#{
msg => "error probing ports",
exception => Kind,
reason => Reason,
stacktrace => Stacktrace
}
end.
verify_results(Results0) ->
Errors = [
R
|| R = {_Node, #{status := Status}} <- Results0,
Status =/= ok
],
case Errors of
[] ->
%% all ok
ok;
_ ->
Results1 = maps:from_list(Results0),
#{results => Results1, msg => "some ports are unreachable"}
end.
create_plan() ->
%% expected core nodes according to mnesia schema
OtherNodes = mnesia:system_info(db_nodes) -- [node()],
lists:map(
fun(N) ->
IPs = node_to_ips(N),
{_GenRPCMod, GenRPCPort} = gen_rpc_helper:get_client_config_per_node(N),
%% 0 or 1 result
EkkaEPMDPort = get_ekka_epmd_port(IPs),
{N, #{
resolved_ips => IPs,
ports_to_check => [GenRPCPort | EkkaEPMDPort]
}}
end,
OtherNodes
).
get_ekka_epmd_port([IP | _]) ->
%% we're currently only checking the first IP, if there are many
case erl_epmd:names(IP) of
{ok, NamePorts} ->
choose_emqx_epmd_port(NamePorts);
_ ->
[]
end;
get_ekka_epmd_port([]) ->
%% failed to get?
[].
%% filter out remsh and take the first emqx port as epmd/ekka port
choose_emqx_epmd_port([{"emqx" ++ _, Port} | _]) ->
[Port];
choose_emqx_epmd_port([{_Name, _Port} | Rest]) ->
choose_emqx_epmd_port(Rest);
choose_emqx_epmd_port([]) ->
[].
do_check({Node, #{resolved_ips := []} = Plan}) ->
{Node, Plan#{status => failed_to_resolve_ip}};
do_check({Node, #{resolved_ips := [IP | _]} = Plan}) ->
%% check other IPs too?
PortsToCheck = maps:get(ports_to_check, Plan),
PortStatus0 = lists:map(fun(P) -> is_tcp_port_open(IP, P) end, PortsToCheck),
case lists:all(fun(IsOpen) -> IsOpen end, PortStatus0) of
true ->
{Node, Plan#{status => ok}};
false ->
PortStatus1 = maps:from_list(lists:zip(PortsToCheck, PortStatus0)),
{Node, Plan#{status => bad_ports, open_ports => PortStatus1}}
end.
node_to_ips(Node) ->
NodeBin0 = atom_to_binary(Node),
HostOrIP = re:replace(NodeBin0, <<"^.+@">>, <<"">>, [{return, list}]),
AddressType = resolve_dist_address_type(),
case inet:gethostbyname(HostOrIP, AddressType) of
{ok, #hostent{h_addr_list = AddrList}} ->
AddrList;
_ ->
[]
end.
is_tcp_port_open(IP, Port) ->
case gen_tcp:connect(IP, Port, [], ?PORT_PROBE_TIMEOUT) of
{ok, P} ->
gen_tcp:close(P),
true;
_ ->
false
end.
resolve_dist_address_type() ->
ProtoDistStr = os:getenv("EKKA_PROTO_DIST_MOD", "inet_tcp"),
case ProtoDistStr of
"inet_tcp" ->
inet;
"inet6_tcp" ->
inet6;
"inet_tls" ->
inet;
"inet6_tls" ->
inet6;
_ ->
inet
end.
%% Note: this function is stored in the Mria's application environment
mria_lb_custom_info() ->
get_emqx_vsn().
%% Note: this function is stored in the Mria's application environment
mria_lb_custom_info_check(undefined) ->
false;
mria_lb_custom_info_check(OtherVsn) ->
get_emqx_vsn() =:= OtherVsn.
get_emqx_vsn() ->
case application:get_key(emqx, vsn) of
{ok, Vsn} ->
Vsn;
undefined ->
undefined
end.