From 4013dd2f145dd748f71bde9c656c8649cb877715 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Fri, 1 Apr 2022 15:18:25 +0200 Subject: [PATCH] refactor: refine pmap implementation --- apps/emqx/src/emqx_misc.erl | 124 +++++++++++++----- apps/emqx/test/emqx_misc_SUITE.erl | 18 +-- .../test/emqx_authn_mongo_SUITE.erl | 4 +- .../test/emqx_authn_mongo_tls_SUITE.erl | 4 +- .../test/emqx_authn_mysql_SUITE.erl | 4 +- .../test/emqx_authn_mysql_tls_SUITE.erl | 4 +- .../test/emqx_authn_pgsql_SUITE.erl | 4 +- .../test/emqx_authn_pgsql_tls_SUITE.erl | 4 +- .../test/emqx_authn_redis_SUITE.erl | 4 +- .../test/emqx_authn_redis_tls_SUITE.erl | 4 +- .../src/emqx_connector_mongo.erl | 17 ++- 11 files changed, 129 insertions(+), 62 deletions(-) diff --git a/apps/emqx/src/emqx_misc.erl b/apps/emqx/src/emqx_misc.erl index 2809237d8..d34ab47e1 100644 --- a/apps/emqx/src/emqx_misc.erl +++ b/apps/emqx/src/emqx_misc.erl @@ -59,8 +59,15 @@ hexstr_to_bin/1 ]). +-export([ + nolink_apply/1, + nolink_apply/2 +]). + -export([clamp/3]). +-dialyzer({nowarn_function, [nolink_apply/2]}). + -define(SHORT, 8). -define(DEFAULT_PMAP_TIMEOUT, 5000). @@ -375,29 +382,101 @@ explain_posix(estale) -> "Stale remote file handle"; explain_posix(exdev) -> "Cross-domain link"; explain_posix(NotPosix) -> NotPosix. --spec pmap(fun((A) -> B), list(A)) -> list(B | {error, term()}). +%% @doc Like lists:map/2, only the callback function is evaluated +%% concurrently. +-spec pmap(fun((A) -> B), list(A)) -> list(B). pmap(Fun, List) when is_function(Fun, 1), is_list(List) -> pmap(Fun, List, ?DEFAULT_PMAP_TIMEOUT). --spec pmap(fun((A) -> B), list(A), timeout()) -> list(B | {error, term()}). +-spec pmap(fun((A) -> B), list(A), timeout()) -> list(B). pmap(Fun, List, Timeout) when is_function(Fun, 1), is_list(List), is_integer(Timeout), Timeout >= 0 -> - Self = self(), - Pids = lists:map( - fun(El) -> - spawn_link( - fun() -> pmap_exec(Self, Fun, El, Timeout) end - ) - end, - List + nolink_apply(fun() -> do_parallel_map(Fun, List) end, Timeout). + +%% @doc Delegate a function to a worker process. +%% The function may spawn_link other processes but we do not +%% want the caller process to be linked. +%% This is done by isolating the possible link with a not-linked +%% middleman process. +nolink_apply(Fun) -> nolink_apply(Fun, infinity). + +%% @doc Same as `nolink_apply/1', with a timeout. +-spec nolink_apply(function(), timer:timeout()) -> term(). +nolink_apply(Fun, Timeout) when is_function(Fun, 0) -> + Caller = self(), + ResRef = make_ref(), + Middleman = erlang:spawn( + fun() -> + process_flag(trap_exit, true), + CallerMRef = erlang:monitor(process, Caller), + Worker = erlang:spawn_link( + fun() -> + Res = + try + {normal, Fun()} + catch + C:E:S -> + {exception, {C, E, S}} + end, + _ = erlang:send(Caller, {ResRef, Res}), + exit(normal) + end + ), + receive + {'DOWN', CallerMRef, process, _, _} -> + %% For whatever reason, if the caller is dead, + %% there is no reason to continue + exit(Worker, kill), + exit(normal); + {'EXIT', Worker, normal} -> + exit(normal); + {'EXIT', Worker, Reason} -> + %% worker exited with some reason other than 'normal' + _ = erlang:send(Caller, {ResRef, {'EXIT', Reason}}), + exit(normal) + end + end ), - pmap_gather(Pids). + receive + {ResRef, {normal, Result}} -> + Result; + {ResRef, {exception, {C, E, S}}} -> + erlang:raise(C, E, S); + {ResRef, {'EXIT', Reason}} -> + exit(Reason) + after Timeout -> + exit(Middleman, kill), + exit(timeout) + end. %%------------------------------------------------------------------------------ %% Internal Functions %%------------------------------------------------------------------------------ +do_parallel_map(Fun, List) -> + Parent = self(), + PidList = lists:map( + fun(Item) -> + erlang:spawn_link( + fun() -> + Parent ! {self(), Fun(Item)} + end + ) + end, + List + ), + lists:foldr( + fun(Pid, Acc) -> + receive + {Pid, Result} -> + [Result | Acc] + end + end, + [], + PidList + ). + int_to_hex(I, N) when is_integer(I), I >= 0 -> int_to_hex([], I, 1, N). @@ -418,29 +497,6 @@ pad(L, 0) -> pad(L, Count) -> pad([$0 | L], Count - 1). -pmap_gather([Pid | Pids]) -> - receive - {Pid, Result} -> [Result | pmap_gather(Pids)] - end; -pmap_gather([]) -> - []. - -pmap_exec(CallerPid, Fun, El, Timeout) -> - ExecPid = self(), - {Pid, Ref} = spawn_monitor(fun() -> - Result = Fun(El), - ExecPid ! {result, self(), Result} - end), - ExecResult = - receive - {result, Pid, Result} -> Result; - {'DOWN', Ref, process, Pid, Reason} -> {error, Reason} - after Timeout -> - true = erlang:exit(Pid, kill), - {error, timeout} - end, - CallerPid ! {ExecPid, ExecResult}. - -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/apps/emqx/test/emqx_misc_SUITE.erl b/apps/emqx/test/emqx_misc_SUITE.erl index 3b83dce8f..c7f7b1be7 100644 --- a/apps/emqx/test/emqx_misc_SUITE.erl +++ b/apps/emqx/test/emqx_misc_SUITE.erl @@ -171,17 +171,18 @@ t_gen_id(_) -> ?assertEqual(10, length(emqx_misc:gen_id(10))), ?assertEqual(20, length(emqx_misc:gen_id(20))). -t_pmap(_) -> +t_pmap_normal(_) -> ?assertEqual( [5, 7, 9], emqx_misc:pmap( fun({A, B}) -> A + B end, [{2, 3}, {3, 4}, {4, 5}] ) - ), + ). - ?assertEqual( - [5, 7, {error, timeout}], +t_pmap_timeout(_) -> + ?assertExit( + timeout, emqx_misc:pmap( fun (timeout) -> ct:sleep(1000); @@ -190,13 +191,14 @@ t_pmap(_) -> [{2, 3}, {3, 4}, timeout], 100 ) - ), + ). - ?assertMatch( - [5, 7, {error, _}], +t_pmap_exception(_) -> + ?assertExit( + {foobar, _}, emqx_misc:pmap( fun - (error) -> error(exc); + (error) -> error(foobar); ({A, B}) -> A + B end, [{2, 3}, {3, 4}, error] diff --git a/apps/emqx_authn/test/emqx_authn_mongo_SUITE.erl b/apps/emqx_authn/test/emqx_authn_mongo_SUITE.erl index f49d259ac..1f3667efb 100644 --- a/apps/emqx_authn/test/emqx_authn_mongo_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_mongo_SUITE.erl @@ -19,8 +19,8 @@ -compile(nowarn_export_all). -compile(export_all). --include("emqx_connector.hrl"). --include("emqx_authn.hrl"). +-include_lib("emqx_connector/include/emqx_connector.hrl"). +-include_lib("emqx_authn/include/emqx_authn.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). diff --git a/apps/emqx_authn/test/emqx_authn_mongo_tls_SUITE.erl b/apps/emqx_authn/test/emqx_authn_mongo_tls_SUITE.erl index 274c5b7a4..f46316488 100644 --- a/apps/emqx_authn/test/emqx_authn_mongo_tls_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_mongo_tls_SUITE.erl @@ -19,8 +19,8 @@ -compile(nowarn_export_all). -compile(export_all). --include("emqx_connector.hrl"). --include("emqx_authn.hrl"). +-include_lib("emqx_connector/include/emqx_connector.hrl"). +-include_lib("emqx_authn/include/emqx_authn.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). diff --git a/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl b/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl index 3d2dba895..fd87335b4 100644 --- a/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl @@ -19,8 +19,8 @@ -compile(nowarn_export_all). -compile(export_all). --include("emqx_connector.hrl"). --include("emqx_authn.hrl"). +-include_lib("emqx_connector/include/emqx_connector.hrl"). +-include_lib("emqx_authn/include/emqx_authn.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). diff --git a/apps/emqx_authn/test/emqx_authn_mysql_tls_SUITE.erl b/apps/emqx_authn/test/emqx_authn_mysql_tls_SUITE.erl index d7d655246..f3af66cae 100644 --- a/apps/emqx_authn/test/emqx_authn_mysql_tls_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_mysql_tls_SUITE.erl @@ -19,8 +19,8 @@ -compile(nowarn_export_all). -compile(export_all). --include("emqx_connector.hrl"). --include("emqx_authn.hrl"). +-include_lib("emqx_connector/include/emqx_connector.hrl"). +-include_lib("emqx_authn/include/emqx_authn.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). diff --git a/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl b/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl index 269825dce..60b789bcf 100644 --- a/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl @@ -19,8 +19,8 @@ -compile(nowarn_export_all). -compile(export_all). --include("emqx_connector.hrl"). --include("emqx_authn.hrl"). +-include_lib("emqx_connector/include/emqx_connector.hrl"). +-include_lib("emqx_authn/include/emqx_authn.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("emqx/include/emqx_placeholder.hrl"). diff --git a/apps/emqx_authn/test/emqx_authn_pgsql_tls_SUITE.erl b/apps/emqx_authn/test/emqx_authn_pgsql_tls_SUITE.erl index 300cd748e..a1a34927a 100644 --- a/apps/emqx_authn/test/emqx_authn_pgsql_tls_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_pgsql_tls_SUITE.erl @@ -19,8 +19,8 @@ -compile(nowarn_export_all). -compile(export_all). --include("emqx_connector.hrl"). --include("emqx_authn.hrl"). +-include_lib("emqx_connector/include/emqx_connector.hrl"). +-include_lib("emqx_authn/include/emqx_authn.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). diff --git a/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl b/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl index 58d1cbc0c..163044c21 100644 --- a/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl @@ -19,8 +19,8 @@ -compile(nowarn_export_all). -compile(export_all). --include("emqx_connector.hrl"). --include("emqx_authn.hrl"). +-include_lib("emqx_connector/include/emqx_connector.hrl"). +-include_lib("emqx_authn/include/emqx_authn.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). diff --git a/apps/emqx_authn/test/emqx_authn_redis_tls_SUITE.erl b/apps/emqx_authn/test/emqx_authn_redis_tls_SUITE.erl index 0e00024fb..6ef889166 100644 --- a/apps/emqx_authn/test/emqx_authn_redis_tls_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_redis_tls_SUITE.erl @@ -19,8 +19,8 @@ -compile(nowarn_export_all). -compile(export_all). --include("emqx_connector.hrl"). --include("emqx_authn.hrl"). +-include_lib("emqx_connector/include/emqx_connector.hrl"). +-include_lib("emqx_authn/include/emqx_authn.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). diff --git a/apps/emqx_connector/src/emqx_connector_mongo.erl b/apps/emqx_connector/src/emqx_connector_mongo.erl index b2877c617..5c10d0a6f 100644 --- a/apps/emqx_connector/src/emqx_connector_mongo.erl +++ b/apps/emqx_connector/src/emqx_connector_mongo.erl @@ -241,10 +241,19 @@ on_get_status(InstId, #{poolname := PoolName} = _State) -> health_check(PoolName) -> Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], - Status = emqx_misc:pmap( - fun check_worker_health/1, Workers, ?HEALTH_CHECK_TIMEOUT + timer:seconds(1) - ), - length(Status) > 0 andalso lists:all(fun(St) -> St =:= true end, Status). + try + emqx_misc:pmap( + fun check_worker_health/1, Workers, ?HEALTH_CHECK_TIMEOUT + timer:seconds(1) + ) + of + [_ | _] = Status -> + lists:all(fun(St) -> St =:= true end, Status); + [] -> + false + catch + exit:timeout -> + false + end. %% ===================================================================