diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index 813ab84e8..6633d4b4e 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -45,6 +45,8 @@ , index_of/2 , maybe_parse_ip/1 , ipv6_probe/1 + , pmap/2 + , pmap/3 ]). -export([ bin2hexstr_A_F/1 @@ -55,7 +57,13 @@ -export([ is_sane_id/1 ]). +-export([ + nolink_apply/1, + nolink_apply/2 +]). + -define(VALID_STR_RE, "^[A-Za-z0-9]+[A-Za-z0-9-_]*$"). +-define(DEFAULT_PMAP_TIMEOUT, 5000). -spec is_sane_id(list() | binary()) -> ok | {error, Reason::binary()}. is_sane_id(Str) -> @@ -328,6 +336,110 @@ hexchar2int(I) when I >= $0 andalso I =< $9 -> I - $0; hexchar2int(I) when I >= $A andalso I =< $F -> I - $A + 10; hexchar2int(I) when I >= $a andalso I =< $f -> I - $a + 10. +%% @doc Like lists:map/2, only the callback function is evaluated +%% concurrently. +-spec pmap(fun((A) -> B), list(A)) -> list(B). +pmap(Fun, List) when is_function(Fun, 1), is_list(List) -> + pmap(Fun, List, ?DEFAULT_PMAP_TIMEOUT). + +-spec pmap(fun((A) -> B), list(A), timeout()) -> list(B). +pmap(Fun, List, Timeout) when + is_function(Fun, 1), is_list(List), is_integer(Timeout), Timeout >= 0 +-> + nolink_apply(fun() -> do_parallel_map(Fun, List) end, Timeout). + +%% @doc Delegate a function to a worker process. +%% The function may spawn_link other processes but we do not +%% want the caller process to be linked. +%% This is done by isolating the possible link with a not-linked +%% middleman process. +nolink_apply(Fun) -> nolink_apply(Fun, infinity). + +%% @doc Same as `nolink_apply/1', with a timeout. +-spec nolink_apply(function(), timer:timeout()) -> term(). +nolink_apply(Fun, Timeout) when is_function(Fun, 0) -> + Caller = self(), + ResRef = make_ref(), + Middleman = erlang:spawn(make_middleman_fn(Caller, Fun, ResRef)), + receive + {ResRef, {normal, Result}} -> + Result; + {ResRef, {exception, {C, E, S}}} -> + erlang:raise(C, E, S); + {ResRef, {'EXIT', Reason}} -> + exit(Reason) + after Timeout -> + exit(Middleman, kill), + exit(timeout) + end. + +-spec make_middleman_fn(pid(), fun(() -> any()), reference()) -> fun(() -> no_return()). +make_middleman_fn(Caller, Fun, ResRef) -> + fun() -> + process_flag(trap_exit, true), + CallerMRef = erlang:monitor(process, Caller), + Worker = erlang:spawn_link(make_worker_fn(Caller, Fun, ResRef)), + receive + {'DOWN', CallerMRef, process, _, _} -> + %% For whatever reason, if the caller is dead, + %% there is no reason to continue + exit(Worker, kill), + exit(normal); + {'EXIT', Worker, normal} -> + exit(normal); + {'EXIT', Worker, Reason} -> + %% worker exited with some reason other than 'normal' + _ = erlang:send(Caller, {ResRef, {'EXIT', Reason}}), + exit(normal) + end + end. + +-spec make_worker_fn(pid(), fun(() -> any()), reference()) -> fun(() -> no_return()). +make_worker_fn(Caller, Fun, ResRef) -> + fun() -> + Res = + try + {normal, Fun()} + catch + C:E:S -> + {exception, {C, E, S}} + end, + _ = erlang:send(Caller, {ResRef, Res}), + exit(normal) + end. + +do_parallel_map(Fun, List) -> + Parent = self(), + PidList = lists:map( + fun(Item) -> + erlang:spawn_link( + fun() -> + Res = + try + {normal, Fun(Item)} + catch + C:E:St -> + {exception, {C, E, St}} + end, + Parent ! {self(), Res} + end + ) + end, + List + ), + lists:foldr( + fun(Pid, Acc) -> + receive + {Pid, {normal, Result}} -> + [Result | Acc]; + {Pid, {exception, {C, E, St}}} -> + erlang:raise(C, E, St) + end + end, + [], + PidList + ). + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/test/emqx_misc_SUITE.erl b/test/emqx_misc_SUITE.erl index 0eec55faa..e9dd3e132 100644 --- a/test/emqx_misc_SUITE.erl +++ b/test/emqx_misc_SUITE.erl @@ -146,3 +146,36 @@ t_now_to_secs(_) -> t_now_to_ms(_) -> ?assert(is_integer(emqx_misc:now_to_ms(os:timestamp()))). +t_pmap_normal(_) -> + ?assertEqual( + [5, 7, 9], + emqx_misc:pmap( + fun({A, B}) -> A + B end, + [{2, 3}, {3, 4}, {4, 5}] + ) + ). + +t_pmap_timeout(_) -> + ?assertExit( + timeout, + emqx_misc:pmap( + fun + (timeout) -> ct:sleep(1000); + ({A, B}) -> A + B + end, + [{2, 3}, {3, 4}, timeout], + 100 + ) + ). + +t_pmap_exception(_) -> + ?assertError( + foobar, + emqx_misc:pmap( + fun + (error) -> error(foobar); + ({A, B}) -> A + B + end, + [{2, 3}, {3, 4}, error] + ) + ).