From 0dae3f43a9edbcf088b8220c490781f5d97fa251 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Thu, 5 May 2022 12:54:24 +0300 Subject: [PATCH] fix(mongodb): fix mongodb connection healthcheck --- apps/emqx/src/emqx_misc.erl | 48 ++++++++++++++++++- apps/emqx/test/emqx_bpapi_static_checks.erl | 2 +- apps/emqx/test/emqx_misc_SUITE.erl | 32 +++++++++++++ .../src/emqx_connector_mongo.erl | 6 ++- 4 files changed, 84 insertions(+), 4 deletions(-) diff --git a/apps/emqx/src/emqx_misc.erl b/apps/emqx/src/emqx_misc.erl index f216b61e3..2809237d8 100644 --- a/apps/emqx/src/emqx_misc.erl +++ b/apps/emqx/src/emqx_misc.erl @@ -49,7 +49,9 @@ ipv6_probe/1, gen_id/0, gen_id/1, - explain_posix/1 + explain_posix/1, + pmap/2, + pmap/3 ]). -export([ @@ -61,6 +63,8 @@ -define(SHORT, 8). +-define(DEFAULT_PMAP_TIMEOUT, 5000). + %% @doc Parse v4 or v6 string format address to tuple. %% `Host' itself is returned if it's not an ip string. maybe_parse_ip(Host) -> @@ -371,6 +375,25 @@ explain_posix(estale) -> "Stale remote file handle"; explain_posix(exdev) -> "Cross-domain link"; explain_posix(NotPosix) -> NotPosix. +-spec pmap(fun((A) -> B), list(A)) -> list(B | {error, term()}). +pmap(Fun, List) when is_function(Fun, 1), is_list(List) -> + pmap(Fun, List, ?DEFAULT_PMAP_TIMEOUT). + +-spec pmap(fun((A) -> B), list(A), timeout()) -> list(B | {error, term()}). +pmap(Fun, List, Timeout) when + is_function(Fun, 1), is_list(List), is_integer(Timeout), Timeout >= 0 +-> + Self = self(), + Pids = lists:map( + fun(El) -> + spawn_link( + fun() -> pmap_exec(Self, Fun, El, Timeout) end + ) + end, + List + ), + pmap_gather(Pids). + %%------------------------------------------------------------------------------ %% Internal Functions %%------------------------------------------------------------------------------ @@ -395,6 +418,29 @@ pad(L, 0) -> pad(L, Count) -> pad([$0 | L], Count - 1). +pmap_gather([Pid | Pids]) -> + receive + {Pid, Result} -> [Result | pmap_gather(Pids)] + end; +pmap_gather([]) -> + []. + +pmap_exec(CallerPid, Fun, El, Timeout) -> + ExecPid = self(), + {Pid, Ref} = spawn_monitor(fun() -> + Result = Fun(El), + ExecPid ! {result, self(), Result} + end), + ExecResult = + receive + {result, Pid, Result} -> Result; + {'DOWN', Ref, process, Pid, Reason} -> {error, Reason} + after Timeout -> + true = erlang:exit(Pid, kill), + {error, timeout} + end, + CallerPid ! {ExecPid, ExecResult}. + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/apps/emqx/test/emqx_bpapi_static_checks.erl b/apps/emqx/test/emqx_bpapi_static_checks.erl index 124c06896..c889ba772 100644 --- a/apps/emqx/test/emqx_bpapi_static_checks.erl +++ b/apps/emqx/test/emqx_bpapi_static_checks.erl @@ -59,7 +59,7 @@ %% List of functions in the RPC backend modules that we can ignore: % TODO: handle pmap --define(IGNORED_RPC_CALLS, "gen_rpc:nodes/0, emqx_rpc:unwrap_erpc/1, rpc:pmap/3"). +-define(IGNORED_RPC_CALLS, "gen_rpc:nodes/0, emqx_rpc:unwrap_erpc/1"). %% List of business-layer functions that are exempt from the checks: -define(EXEMPTIONS, % Reason: legacy code. A fun and a QC query are diff --git a/apps/emqx/test/emqx_misc_SUITE.erl b/apps/emqx/test/emqx_misc_SUITE.erl index b85bbcb0e..3b83dce8f 100644 --- a/apps/emqx/test/emqx_misc_SUITE.erl +++ b/apps/emqx/test/emqx_misc_SUITE.erl @@ -170,3 +170,35 @@ t_now_to_ms(_) -> t_gen_id(_) -> ?assertEqual(10, length(emqx_misc:gen_id(10))), ?assertEqual(20, length(emqx_misc:gen_id(20))). + +t_pmap(_) -> + ?assertEqual( + [5, 7, 9], + emqx_misc:pmap( + fun({A, B}) -> A + B end, + [{2, 3}, {3, 4}, {4, 5}] + ) + ), + + ?assertEqual( + [5, 7, {error, timeout}], + emqx_misc:pmap( + fun + (timeout) -> ct:sleep(1000); + ({A, B}) -> A + B + end, + [{2, 3}, {3, 4}, timeout], + 100 + ) + ), + + ?assertMatch( + [5, 7, {error, _}], + emqx_misc:pmap( + fun + (error) -> error(exc); + ({A, B}) -> A + B + end, + [{2, 3}, {3, 4}, error] + ) + ). diff --git a/apps/emqx_connector/src/emqx_connector_mongo.erl b/apps/emqx_connector/src/emqx_connector_mongo.erl index 238140700..b2877c617 100644 --- a/apps/emqx_connector/src/emqx_connector_mongo.erl +++ b/apps/emqx_connector/src/emqx_connector_mongo.erl @@ -241,8 +241,10 @@ on_get_status(InstId, #{poolname := PoolName} = _State) -> health_check(PoolName) -> Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], - Status = rpc:pmap({?MODULE, check_worker_health}, [], Workers), - length(Status) > 0 andalso lists:all(fun(St) -> St end, Status). + Status = emqx_misc:pmap( + fun check_worker_health/1, Workers, ?HEALTH_CHECK_TIMEOUT + timer:seconds(1) + ), + length(Status) > 0 andalso lists:all(fun(St) -> St =:= true end, Status). %% ===================================================================