Merge pull request #11637 from thalesmg/port-scan-mria-check-m-20230919
feat: add port scan diagnostics to mria waiting for tables checks
This commit is contained in:
commit
5e400575e1
|
@ -28,7 +28,7 @@
|
|||
{gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
|
||||
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},
|
||||
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.7"}}},
|
||||
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.14"}}},
|
||||
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.15"}}},
|
||||
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.1.0"}}},
|
||||
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.16"}}},
|
||||
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}},
|
||||
|
|
|
@ -26,6 +26,13 @@
|
|||
update_vips/0
|
||||
]).
|
||||
|
||||
-export([open_ports_check/0]).
|
||||
|
||||
-ifdef(TEST).
|
||||
-export([create_plan/0]).
|
||||
-endif.
|
||||
|
||||
-include_lib("kernel/include/inet.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
|
||||
%% @doc EMQX boot entrypoint.
|
||||
|
@ -42,6 +49,7 @@ start() ->
|
|||
ok = set_backtrace_depth(),
|
||||
start_sysmon(),
|
||||
configure_shard_transports(),
|
||||
set_mnesia_extra_diagnostic_checks(),
|
||||
ekka:start(),
|
||||
ok.
|
||||
|
||||
|
@ -94,3 +102,111 @@ configure_shard_transports() ->
|
|||
end,
|
||||
maps:to_list(ShardTransports)
|
||||
).
|
||||
|
||||
set_mnesia_extra_diagnostic_checks() ->
|
||||
Checks = [{check_open_ports, ok, fun ?MODULE:open_ports_check/0}],
|
||||
mria_config:set_extra_mnesia_diagnostic_checks(Checks),
|
||||
ok.
|
||||
|
||||
-define(PORT_PROBE_TIMEOUT, 10_000).
|
||||
open_ports_check() ->
|
||||
Plan = create_plan(),
|
||||
%% 2 ports to check: ekka/epmd and gen_rpc
|
||||
Timeout = 2 * ?PORT_PROBE_TIMEOUT + 5_000,
|
||||
try emqx_utils:pmap(fun do_check/1, Plan, Timeout) of
|
||||
Results ->
|
||||
verify_results(Results)
|
||||
catch
|
||||
Kind:Reason:Stacktrace ->
|
||||
#{
|
||||
msg => "error probing ports",
|
||||
exception => Kind,
|
||||
reason => Reason,
|
||||
stacktrace => Stacktrace
|
||||
}
|
||||
end.
|
||||
|
||||
verify_results(Results0) ->
|
||||
Errors = [
|
||||
R
|
||||
|| R = {_Node, #{status := Status}} <- Results0,
|
||||
Status =/= ok
|
||||
],
|
||||
case Errors of
|
||||
[] ->
|
||||
%% all ok
|
||||
ok;
|
||||
_ ->
|
||||
Results1 = maps:from_list(Results0),
|
||||
#{results => Results1, msg => "some ports are unreachable"}
|
||||
end.
|
||||
|
||||
create_plan() ->
|
||||
%% expected core nodes according to mnesia schema
|
||||
OtherNodes = mnesia:system_info(db_nodes) -- [node()],
|
||||
lists:map(
|
||||
fun(N) ->
|
||||
IPs = node_to_ips(N),
|
||||
{_GenRPCMod, GenRPCPort} = gen_rpc_helper:get_client_config_per_node(N),
|
||||
%% 0 or 1 result
|
||||
EkkaEPMDPort = get_ekka_epmd_port(IPs),
|
||||
{N, #{
|
||||
resolved_ips => IPs,
|
||||
ports_to_check => [GenRPCPort | EkkaEPMDPort]
|
||||
}}
|
||||
end,
|
||||
OtherNodes
|
||||
).
|
||||
|
||||
get_ekka_epmd_port([IP | _]) ->
|
||||
%% we're currently only checking the first IP, if there are many
|
||||
case erl_epmd:names(IP) of
|
||||
{ok, NamePorts} ->
|
||||
choose_emqx_epmd_port(NamePorts);
|
||||
_ ->
|
||||
[]
|
||||
end;
|
||||
get_ekka_epmd_port([]) ->
|
||||
%% failed to get?
|
||||
[].
|
||||
|
||||
%% filter out remsh and take the first emqx port as epmd/ekka port
|
||||
choose_emqx_epmd_port([{"emqx" ++ _, Port} | _]) ->
|
||||
[Port];
|
||||
choose_emqx_epmd_port([{_Name, _Port} | Rest]) ->
|
||||
choose_emqx_epmd_port(Rest);
|
||||
choose_emqx_epmd_port([]) ->
|
||||
[].
|
||||
|
||||
do_check({Node, #{resolved_ips := []} = Plan}) ->
|
||||
{Node, Plan#{status => failed_to_resolve_ip}};
|
||||
do_check({Node, #{resolved_ips := [IP | _]} = Plan}) ->
|
||||
%% check other IPs too?
|
||||
PortsToCheck = maps:get(ports_to_check, Plan),
|
||||
PortStatus0 = lists:map(fun(P) -> is_tcp_port_open(IP, P) end, PortsToCheck),
|
||||
case lists:all(fun(IsOpen) -> IsOpen end, PortStatus0) of
|
||||
true ->
|
||||
{Node, Plan#{status => ok}};
|
||||
false ->
|
||||
PortStatus1 = maps:from_list(lists:zip(PortsToCheck, PortStatus0)),
|
||||
{Node, Plan#{status => bad_ports, open_ports => PortStatus1}}
|
||||
end.
|
||||
|
||||
node_to_ips(Node) ->
|
||||
NodeBin0 = atom_to_binary(Node),
|
||||
HostOrIP = re:replace(NodeBin0, <<"^.+@">>, <<"">>, [{return, list}]),
|
||||
case inet:gethostbyname(HostOrIP, inet) of
|
||||
{ok, #hostent{h_addr_list = AddrList}} ->
|
||||
AddrList;
|
||||
_ ->
|
||||
[]
|
||||
end.
|
||||
|
||||
is_tcp_port_open(IP, Port) ->
|
||||
case gen_tcp:connect(IP, Port, [], ?PORT_PROBE_TIMEOUT) of
|
||||
{ok, P} ->
|
||||
gen_tcp:close(P),
|
||||
true;
|
||||
_ ->
|
||||
false
|
||||
end.
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
-include_lib("emqx/include/emqx.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
|
@ -67,6 +68,15 @@ end_per_suite(_Config) ->
|
|||
init_per_testcase(t_custom_shard_transports, Config) ->
|
||||
OldConfig = application:get_env(emqx_machine, custom_shard_transports),
|
||||
[{old_config, OldConfig} | Config];
|
||||
init_per_testcase(t_open_ports_check = TestCase, Config) ->
|
||||
AppSpecs = [emqx],
|
||||
Cluster = [
|
||||
{emqx_machine_SUITE1, #{role => core, apps => AppSpecs}},
|
||||
{emqx_machine_SUITE2, #{role => core, apps => AppSpecs}},
|
||||
{emqx_machine_SUITE3, #{role => replicant, apps => AppSpecs}}
|
||||
],
|
||||
Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}),
|
||||
[{nodes, Nodes} | Config];
|
||||
init_per_testcase(_TestCase, Config) ->
|
||||
Config.
|
||||
|
||||
|
@ -80,6 +90,10 @@ end_per_testcase(t_custom_shard_transports, Config) ->
|
|||
application:unset_env(emqx_machine, custom_shard_transports)
|
||||
end,
|
||||
ok;
|
||||
end_per_testcase(t_open_ports_check, Config) ->
|
||||
Nodes = ?config(nodes, Config),
|
||||
ok = emqx_cth_cluster:stop(Nodes),
|
||||
ok;
|
||||
end_per_testcase(_TestCase, _Config) ->
|
||||
ok.
|
||||
|
||||
|
@ -112,3 +126,51 @@ t_node_status(_Config) ->
|
|||
},
|
||||
jsx:decode(JSON)
|
||||
).
|
||||
|
||||
t_open_ports_check(Config) ->
|
||||
[Core1, Core2, Replicant] = ?config(nodes, Config),
|
||||
|
||||
Plan = erpc:call(Core1, emqx_machine, create_plan, []),
|
||||
?assertMatch(
|
||||
[{Core2, #{ports_to_check := [_GenRPC0, _Ekka0], resolved_ips := [_]}}],
|
||||
Plan
|
||||
),
|
||||
[{Core2, #{ports_to_check := [GenRPCPort, EkkaPort], resolved_ips := [_]}}] = Plan,
|
||||
?assertMatch(
|
||||
[{Core1, #{ports_to_check := [_GenRPC1, _Ekka1], resolved_ips := [_]}}],
|
||||
erpc:call(Core2, emqx_machine, create_plan, [])
|
||||
),
|
||||
?assertMatch(
|
||||
[],
|
||||
erpc:call(Replicant, emqx_machine, create_plan, [])
|
||||
),
|
||||
|
||||
?assertEqual(ok, erpc:call(Core1, emqx_machine, open_ports_check, [])),
|
||||
?assertEqual(ok, erpc:call(Core2, emqx_machine, open_ports_check, [])),
|
||||
?assertEqual(ok, erpc:call(Replicant, emqx_machine, open_ports_check, [])),
|
||||
|
||||
ok = emqx_cth_cluster:stop_node(Core2),
|
||||
|
||||
?assertEqual(ok, erpc:call(Replicant, emqx_machine, open_ports_check, [])),
|
||||
?assertMatch(
|
||||
#{
|
||||
msg := "some ports are unreachable",
|
||||
results :=
|
||||
#{
|
||||
Core2 :=
|
||||
#{
|
||||
open_ports := #{
|
||||
GenRPCPort := _,
|
||||
EkkaPort := _
|
||||
},
|
||||
ports_to_check := [_, _],
|
||||
resolved_ips := [_],
|
||||
status := bad_ports
|
||||
}
|
||||
}
|
||||
},
|
||||
erpc:call(Core1, emqx_machine, open_ports_check, []),
|
||||
#{core2 => Core2}
|
||||
),
|
||||
|
||||
ok.
|
||||
|
|
|
@ -0,0 +1,3 @@
|
|||
Added an extra diagnostic to help debug issues when mnesia is waiting for tables.
|
||||
|
||||
Updated libraries: `ekka` -> 0.15.15, `mria` -> 0.6.4.
|
2
mix.exs
2
mix.exs
|
@ -55,7 +55,7 @@ defmodule EMQXUmbrella.MixProject do
|
|||
{:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true},
|
||||
{:esockd, github: "emqx/esockd", tag: "5.9.7", override: true},
|
||||
{:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-1", override: true},
|
||||
{:ekka, github: "emqx/ekka", tag: "0.15.14", override: true},
|
||||
{:ekka, github: "emqx/ekka", tag: "0.15.15", override: true},
|
||||
{:gen_rpc, github: "emqx/gen_rpc", tag: "3.1.0", override: true},
|
||||
{:grpc, github: "emqx/grpc-erl", tag: "0.6.8", override: true},
|
||||
{:minirest, github: "emqx/minirest", tag: "1.3.13", override: true},
|
||||
|
|
|
@ -62,7 +62,7 @@
|
|||
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}
|
||||
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.7"}}}
|
||||
, {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-1"}}}
|
||||
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.14"}}}
|
||||
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.15"}}}
|
||||
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.1.0"}}}
|
||||
, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.8"}}}
|
||||
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.13"}}}
|
||||
|
|
Loading…
Reference in New Issue