diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 4df08af98..4d1f73890 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -28,7 +28,7 @@ {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.7"}}}, - {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.14"}}}, + {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.15"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.1.0"}}}, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.16"}}}, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}}, diff --git a/apps/emqx_machine/src/emqx_machine.erl b/apps/emqx_machine/src/emqx_machine.erl index aa8f03ae5..5931413cb 100644 --- a/apps/emqx_machine/src/emqx_machine.erl +++ b/apps/emqx_machine/src/emqx_machine.erl @@ -26,6 +26,13 @@ update_vips/0 ]). +-export([open_ports_check/0]). + +-ifdef(TEST). +-export([create_plan/0]). +-endif. + +-include_lib("kernel/include/inet.hrl"). -include_lib("emqx/include/logger.hrl"). %% @doc EMQX boot entrypoint. @@ -42,6 +49,7 @@ start() -> ok = set_backtrace_depth(), start_sysmon(), configure_shard_transports(), + set_mnesia_extra_diagnostic_checks(), ekka:start(), ok. @@ -94,3 +102,111 @@ configure_shard_transports() -> end, maps:to_list(ShardTransports) ). + +set_mnesia_extra_diagnostic_checks() -> + Checks = [{check_open_ports, ok, fun ?MODULE:open_ports_check/0}], + mria_config:set_extra_mnesia_diagnostic_checks(Checks), + ok. + +-define(PORT_PROBE_TIMEOUT, 10_000). +open_ports_check() -> + Plan = create_plan(), + %% 2 ports to check: ekka/epmd and gen_rpc + Timeout = 2 * ?PORT_PROBE_TIMEOUT + 5_000, + try emqx_utils:pmap(fun do_check/1, Plan, Timeout) of + Results -> + verify_results(Results) + catch + Kind:Reason:Stacktrace -> + #{ + msg => "error probing ports", + exception => Kind, + reason => Reason, + stacktrace => Stacktrace + } + end. + +verify_results(Results0) -> + Errors = [ + R + || R = {_Node, #{status := Status}} <- Results0, + Status =/= ok + ], + case Errors of + [] -> + %% all ok + ok; + _ -> + Results1 = maps:from_list(Results0), + #{results => Results1, msg => "some ports are unreachable"} + end. + +create_plan() -> + %% expected core nodes according to mnesia schema + OtherNodes = mnesia:system_info(db_nodes) -- [node()], + lists:map( + fun(N) -> + IPs = node_to_ips(N), + {_GenRPCMod, GenRPCPort} = gen_rpc_helper:get_client_config_per_node(N), + %% 0 or 1 result + EkkaEPMDPort = get_ekka_epmd_port(IPs), + {N, #{ + resolved_ips => IPs, + ports_to_check => [GenRPCPort | EkkaEPMDPort] + }} + end, + OtherNodes + ). + +get_ekka_epmd_port([IP | _]) -> + %% we're currently only checking the first IP, if there are many + case erl_epmd:names(IP) of + {ok, NamePorts} -> + choose_emqx_epmd_port(NamePorts); + _ -> + [] + end; +get_ekka_epmd_port([]) -> + %% failed to get? + []. + +%% filter out remsh and take the first emqx port as epmd/ekka port +choose_emqx_epmd_port([{"emqx" ++ _, Port} | _]) -> + [Port]; +choose_emqx_epmd_port([{_Name, _Port} | Rest]) -> + choose_emqx_epmd_port(Rest); +choose_emqx_epmd_port([]) -> + []. + +do_check({Node, #{resolved_ips := []} = Plan}) -> + {Node, Plan#{status => failed_to_resolve_ip}}; +do_check({Node, #{resolved_ips := [IP | _]} = Plan}) -> + %% check other IPs too? + PortsToCheck = maps:get(ports_to_check, Plan), + PortStatus0 = lists:map(fun(P) -> is_tcp_port_open(IP, P) end, PortsToCheck), + case lists:all(fun(IsOpen) -> IsOpen end, PortStatus0) of + true -> + {Node, Plan#{status => ok}}; + false -> + PortStatus1 = maps:from_list(lists:zip(PortsToCheck, PortStatus0)), + {Node, Plan#{status => bad_ports, open_ports => PortStatus1}} + end. + +node_to_ips(Node) -> + NodeBin0 = atom_to_binary(Node), + HostOrIP = re:replace(NodeBin0, <<"^.+@">>, <<"">>, [{return, list}]), + case inet:gethostbyname(HostOrIP, inet) of + {ok, #hostent{h_addr_list = AddrList}} -> + AddrList; + _ -> + [] + end. + +is_tcp_port_open(IP, Port) -> + case gen_tcp:connect(IP, Port, [], ?PORT_PROBE_TIMEOUT) of + {ok, P} -> + gen_tcp:close(P), + true; + _ -> + false + end. diff --git a/apps/emqx_machine/test/emqx_machine_SUITE.erl b/apps/emqx_machine/test/emqx_machine_SUITE.erl index 224732d1f..3c84194da 100644 --- a/apps/emqx_machine/test/emqx_machine_SUITE.erl +++ b/apps/emqx_machine/test/emqx_machine_SUITE.erl @@ -22,6 +22,7 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). all() -> emqx_common_test_helpers:all(?MODULE). @@ -67,6 +68,15 @@ end_per_suite(_Config) -> init_per_testcase(t_custom_shard_transports, Config) -> OldConfig = application:get_env(emqx_machine, custom_shard_transports), [{old_config, OldConfig} | Config]; +init_per_testcase(t_open_ports_check = TestCase, Config) -> + AppSpecs = [emqx], + Cluster = [ + {emqx_machine_SUITE1, #{role => core, apps => AppSpecs}}, + {emqx_machine_SUITE2, #{role => core, apps => AppSpecs}}, + {emqx_machine_SUITE3, #{role => replicant, apps => AppSpecs}} + ], + Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}), + [{nodes, Nodes} | Config]; init_per_testcase(_TestCase, Config) -> Config. @@ -80,6 +90,10 @@ end_per_testcase(t_custom_shard_transports, Config) -> application:unset_env(emqx_machine, custom_shard_transports) end, ok; +end_per_testcase(t_open_ports_check, Config) -> + Nodes = ?config(nodes, Config), + ok = emqx_cth_cluster:stop(Nodes), + ok; end_per_testcase(_TestCase, _Config) -> ok. @@ -112,3 +126,51 @@ t_node_status(_Config) -> }, jsx:decode(JSON) ). + +t_open_ports_check(Config) -> + [Core1, Core2, Replicant] = ?config(nodes, Config), + + Plan = erpc:call(Core1, emqx_machine, create_plan, []), + ?assertMatch( + [{Core2, #{ports_to_check := [_GenRPC0, _Ekka0], resolved_ips := [_]}}], + Plan + ), + [{Core2, #{ports_to_check := [GenRPCPort, EkkaPort], resolved_ips := [_]}}] = Plan, + ?assertMatch( + [{Core1, #{ports_to_check := [_GenRPC1, _Ekka1], resolved_ips := [_]}}], + erpc:call(Core2, emqx_machine, create_plan, []) + ), + ?assertMatch( + [], + erpc:call(Replicant, emqx_machine, create_plan, []) + ), + + ?assertEqual(ok, erpc:call(Core1, emqx_machine, open_ports_check, [])), + ?assertEqual(ok, erpc:call(Core2, emqx_machine, open_ports_check, [])), + ?assertEqual(ok, erpc:call(Replicant, emqx_machine, open_ports_check, [])), + + ok = emqx_cth_cluster:stop_node(Core2), + + ?assertEqual(ok, erpc:call(Replicant, emqx_machine, open_ports_check, [])), + ?assertMatch( + #{ + msg := "some ports are unreachable", + results := + #{ + Core2 := + #{ + open_ports := #{ + GenRPCPort := _, + EkkaPort := _ + }, + ports_to_check := [_, _], + resolved_ips := [_], + status := bad_ports + } + } + }, + erpc:call(Core1, emqx_machine, open_ports_check, []), + #{core2 => Core2} + ), + + ok. diff --git a/changes/ce/feat-11637.en.md b/changes/ce/feat-11637.en.md new file mode 100644 index 000000000..29a51d142 --- /dev/null +++ b/changes/ce/feat-11637.en.md @@ -0,0 +1,3 @@ +Added an extra diagnostic to help debug issues when mnesia is waiting for tables. + +Updated libraries: `ekka` -> 0.15.15, `mria` -> 0.6.4. diff --git a/mix.exs b/mix.exs index fdd740b01..0edb28f9b 100644 --- a/mix.exs +++ b/mix.exs @@ -55,7 +55,7 @@ defmodule EMQXUmbrella.MixProject do {:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true}, {:esockd, github: "emqx/esockd", tag: "5.9.7", override: true}, {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-1", override: true}, - {:ekka, github: "emqx/ekka", tag: "0.15.14", override: true}, + {:ekka, github: "emqx/ekka", tag: "0.15.15", override: true}, {:gen_rpc, github: "emqx/gen_rpc", tag: "3.1.0", override: true}, {:grpc, github: "emqx/grpc-erl", tag: "0.6.8", override: true}, {:minirest, github: "emqx/minirest", tag: "1.3.13", override: true}, diff --git a/rebar.config b/rebar.config index fd3f9820d..8cd65d04a 100644 --- a/rebar.config +++ b/rebar.config @@ -62,7 +62,7 @@ , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.7"}}} , {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-1"}}} - , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.14"}}} + , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.15"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.1.0"}}} , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.8"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.13"}}}