chore: port `pmap/nolink_apply` features from master

This commit is contained in:
Thales Macedo Garitezi 2022-09-20 11:37:28 -03:00
parent fe0ba87fd0
commit 83fb479311
2 changed files with 145 additions and 0 deletions

View File

@ -45,6 +45,8 @@
, index_of/2
, maybe_parse_ip/1
, ipv6_probe/1
, pmap/2
, pmap/3
]).
-export([ bin2hexstr_A_F/1
@ -55,7 +57,13 @@
-export([ is_sane_id/1
]).
-export([
nolink_apply/1,
nolink_apply/2
]).
-define(VALID_STR_RE, "^[A-Za-z0-9]+[A-Za-z0-9-_]*$").
-define(DEFAULT_PMAP_TIMEOUT, 5000).
-spec is_sane_id(list() | binary()) -> ok | {error, Reason::binary()}.
is_sane_id(Str) ->
@ -328,6 +336,110 @@ hexchar2int(I) when I >= $0 andalso I =< $9 -> I - $0;
hexchar2int(I) when I >= $A andalso I =< $F -> I - $A + 10;
hexchar2int(I) when I >= $a andalso I =< $f -> I - $a + 10.
%% @doc Like lists:map/2, only the callback function is evaluated
%% concurrently.
-spec pmap(fun((A) -> B), list(A)) -> list(B).
pmap(Fun, List) when is_function(Fun, 1), is_list(List) ->
pmap(Fun, List, ?DEFAULT_PMAP_TIMEOUT).
-spec pmap(fun((A) -> B), list(A), timeout()) -> list(B).
pmap(Fun, List, Timeout) when
is_function(Fun, 1), is_list(List), is_integer(Timeout), Timeout >= 0
->
nolink_apply(fun() -> do_parallel_map(Fun, List) end, Timeout).
%% @doc Delegate a function to a worker process.
%% The function may spawn_link other processes but we do not
%% want the caller process to be linked.
%% This is done by isolating the possible link with a not-linked
%% middleman process.
nolink_apply(Fun) -> nolink_apply(Fun, infinity).
%% @doc Same as `nolink_apply/1', with a timeout.
-spec nolink_apply(function(), timer:timeout()) -> term().
nolink_apply(Fun, Timeout) when is_function(Fun, 0) ->
Caller = self(),
ResRef = make_ref(),
Middleman = erlang:spawn(make_middleman_fn(Caller, Fun, ResRef)),
receive
{ResRef, {normal, Result}} ->
Result;
{ResRef, {exception, {C, E, S}}} ->
erlang:raise(C, E, S);
{ResRef, {'EXIT', Reason}} ->
exit(Reason)
after Timeout ->
exit(Middleman, kill),
exit(timeout)
end.
-spec make_middleman_fn(pid(), fun(() -> any()), reference()) -> fun(() -> no_return()).
make_middleman_fn(Caller, Fun, ResRef) ->
fun() ->
process_flag(trap_exit, true),
CallerMRef = erlang:monitor(process, Caller),
Worker = erlang:spawn_link(make_worker_fn(Caller, Fun, ResRef)),
receive
{'DOWN', CallerMRef, process, _, _} ->
%% For whatever reason, if the caller is dead,
%% there is no reason to continue
exit(Worker, kill),
exit(normal);
{'EXIT', Worker, normal} ->
exit(normal);
{'EXIT', Worker, Reason} ->
%% worker exited with some reason other than 'normal'
_ = erlang:send(Caller, {ResRef, {'EXIT', Reason}}),
exit(normal)
end
end.
-spec make_worker_fn(pid(), fun(() -> any()), reference()) -> fun(() -> no_return()).
make_worker_fn(Caller, Fun, ResRef) ->
fun() ->
Res =
try
{normal, Fun()}
catch
C:E:S ->
{exception, {C, E, S}}
end,
_ = erlang:send(Caller, {ResRef, Res}),
exit(normal)
end.
do_parallel_map(Fun, List) ->
Parent = self(),
PidList = lists:map(
fun(Item) ->
erlang:spawn_link(
fun() ->
Res =
try
{normal, Fun(Item)}
catch
C:E:St ->
{exception, {C, E, St}}
end,
Parent ! {self(), Res}
end
)
end,
List
),
lists:foldr(
fun(Pid, Acc) ->
receive
{Pid, {normal, Result}} ->
[Result | Acc];
{Pid, {exception, {C, E, St}}} ->
erlang:raise(C, E, St)
end
end,
[],
PidList
).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").

View File

@ -146,3 +146,36 @@ t_now_to_secs(_) ->
t_now_to_ms(_) ->
?assert(is_integer(emqx_misc:now_to_ms(os:timestamp()))).
t_pmap_normal(_) ->
?assertEqual(
[5, 7, 9],
emqx_misc:pmap(
fun({A, B}) -> A + B end,
[{2, 3}, {3, 4}, {4, 5}]
)
).
t_pmap_timeout(_) ->
?assertExit(
timeout,
emqx_misc:pmap(
fun
(timeout) -> ct:sleep(1000);
({A, B}) -> A + B
end,
[{2, 3}, {3, 4}, timeout],
100
)
).
t_pmap_exception(_) ->
?assertError(
foobar,
emqx_misc:pmap(
fun
(error) -> error(foobar);
({A, B}) -> A + B
end,
[{2, 3}, {3, 4}, error]
)
).