Merge pull request #11078 from thalesmg/fix-pmap-alias-master

fix(nolink_apply): avoid sending late replies to caller
This commit is contained in:
Thales Macedo Garitezi 2023-06-19 10:32:43 -03:00 committed by GitHub
commit 7f1e8baa76
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 52 additions and 4 deletions

View File

@ -20,6 +20,8 @@
%% [TODO] Cleanup so the instruction below is not necessary. %% [TODO] Cleanup so the instruction below is not necessary.
-elvis([{elvis_style, god_modules, disable}]). -elvis([{elvis_style, god_modules, disable}]).
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-export([ -export([
merge_opts/2, merge_opts/2,
maybe_apply/2, maybe_apply/2,
@ -432,7 +434,7 @@ nolink_apply(Fun) -> nolink_apply(Fun, infinity).
-spec nolink_apply(function(), timer:timeout()) -> term(). -spec nolink_apply(function(), timer:timeout()) -> term().
nolink_apply(Fun, Timeout) when is_function(Fun, 0) -> nolink_apply(Fun, Timeout) when is_function(Fun, 0) ->
Caller = self(), Caller = self(),
ResRef = make_ref(), ResRef = alias([reply]),
Middleman = erlang:spawn( Middleman = erlang:spawn(
fun() -> fun() ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
@ -446,7 +448,8 @@ nolink_apply(Fun, Timeout) when is_function(Fun, 0) ->
C:E:S -> C:E:S ->
{exception, {C, E, S}} {exception, {C, E, S}}
end, end,
_ = erlang:send(Caller, {ResRef, Res}), _ = erlang:send(ResRef, {ResRef, Res}),
?tp(pmap_middleman_sent_response, #{}),
exit(normal) exit(normal)
end end
), ),
@ -460,7 +463,7 @@ nolink_apply(Fun, Timeout) when is_function(Fun, 0) ->
exit(normal); exit(normal);
{'EXIT', Worker, Reason} -> {'EXIT', Worker, Reason} ->
%% worker exited with some reason other than 'normal' %% worker exited with some reason other than 'normal'
_ = erlang:send(Caller, {ResRef, {'EXIT', Reason}}), _ = erlang:send(ResRef, {ResRef, {'EXIT', Reason}}),
exit(normal) exit(normal)
end end
end end
@ -473,8 +476,21 @@ nolink_apply(Fun, Timeout) when is_function(Fun, 0) ->
{ResRef, {'EXIT', Reason}} -> {ResRef, {'EXIT', Reason}} ->
exit(Reason) exit(Reason)
after Timeout -> after Timeout ->
%% possible race condition: a message was received just as we enter the after
%% block.
?tp(pmap_timeout, #{}),
unalias(ResRef),
exit(Middleman, kill), exit(Middleman, kill),
exit(timeout) receive
{ResRef, {normal, Result}} ->
Result;
{ResRef, {exception, {C, E, S}}} ->
erlang:raise(C, E, S);
{ResRef, {'EXIT', Reason}} ->
exit(Reason)
after 0 ->
exit(timeout)
end
end. end.
safe_to_existing_atom(In) -> safe_to_existing_atom(In) ->

View File

@ -20,6 +20,7 @@
-compile(nowarn_export_all). -compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(SOCKOPTS, [ -define(SOCKOPTS, [
binary, binary,
@ -208,3 +209,34 @@ t_pmap_exception(_) ->
[{2, 3}, {3, 4}, error] [{2, 3}, {3, 4}, error]
) )
). ).
t_pmap_late_reply(_) ->
?check_trace(
begin
?force_ordering(
#{?snk_kind := pmap_middleman_sent_response},
#{?snk_kind := pmap_timeout}
),
Timeout = 100,
Res =
catch emqx_utils:pmap(
fun(_) ->
process_flag(trap_exit, true),
timer:sleep(3 * Timeout),
done
end,
[1, 2, 3],
Timeout
),
receive
{Ref, LateReply} when is_reference(Ref) ->
ct:fail("should not receive late reply: ~p", [LateReply])
after (5 * Timeout) ->
ok
end,
?assertMatch([done, done, done], Res),
ok
end,
[]
),
ok.