diff --git a/apps/emqx_utils/src/emqx_utils.erl b/apps/emqx_utils/src/emqx_utils.erl index 6cf85fb5d..830845b60 100644 --- a/apps/emqx_utils/src/emqx_utils.erl +++ b/apps/emqx_utils/src/emqx_utils.erl @@ -20,6 +20,8 @@ %% [TODO] Cleanup so the instruction below is not necessary. -elvis([{elvis_style, god_modules, disable}]). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + -export([ merge_opts/2, maybe_apply/2, @@ -432,7 +434,7 @@ nolink_apply(Fun) -> nolink_apply(Fun, infinity). -spec nolink_apply(function(), timer:timeout()) -> term(). nolink_apply(Fun, Timeout) when is_function(Fun, 0) -> Caller = self(), - ResRef = make_ref(), + ResRef = alias([reply]), Middleman = erlang:spawn( fun() -> process_flag(trap_exit, true), @@ -446,7 +448,8 @@ nolink_apply(Fun, Timeout) when is_function(Fun, 0) -> C:E:S -> {exception, {C, E, S}} end, - _ = erlang:send(Caller, {ResRef, Res}), + _ = erlang:send(ResRef, {ResRef, Res}), + ?tp(pmap_middleman_sent_response, #{}), exit(normal) end ), @@ -460,7 +463,7 @@ nolink_apply(Fun, Timeout) when is_function(Fun, 0) -> exit(normal); {'EXIT', Worker, Reason} -> %% worker exited with some reason other than 'normal' - _ = erlang:send(Caller, {ResRef, {'EXIT', Reason}}), + _ = erlang:send(ResRef, {ResRef, {'EXIT', Reason}}), exit(normal) end end @@ -473,8 +476,21 @@ nolink_apply(Fun, Timeout) when is_function(Fun, 0) -> {ResRef, {'EXIT', Reason}} -> exit(Reason) after Timeout -> + %% possible race condition: a message was received just as we enter the after + %% block. + ?tp(pmap_timeout, #{}), + unalias(ResRef), exit(Middleman, kill), - exit(timeout) + receive + {ResRef, {normal, Result}} -> + Result; + {ResRef, {exception, {C, E, S}}} -> + erlang:raise(C, E, S); + {ResRef, {'EXIT', Reason}} -> + exit(Reason) + after 0 -> + exit(timeout) + end end. safe_to_existing_atom(In) -> diff --git a/apps/emqx_utils/test/emqx_utils_SUITE.erl b/apps/emqx_utils/test/emqx_utils_SUITE.erl index 6c6bcf8d3..12e99c917 100644 --- a/apps/emqx_utils/test/emqx_utils_SUITE.erl +++ b/apps/emqx_utils/test/emqx_utils_SUITE.erl @@ -20,6 +20,7 @@ -compile(nowarn_export_all). -include_lib("eunit/include/eunit.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(SOCKOPTS, [ binary, @@ -208,3 +209,34 @@ t_pmap_exception(_) -> [{2, 3}, {3, 4}, error] ) ). + +t_pmap_late_reply(_) -> + ?check_trace( + begin + ?force_ordering( + #{?snk_kind := pmap_middleman_sent_response}, + #{?snk_kind := pmap_timeout} + ), + Timeout = 100, + Res = + catch emqx_utils:pmap( + fun(_) -> + process_flag(trap_exit, true), + timer:sleep(3 * Timeout), + done + end, + [1, 2, 3], + Timeout + ), + receive + {Ref, LateReply} when is_reference(Ref) -> + ct:fail("should not receive late reply: ~p", [LateReply]) + after (5 * Timeout) -> + ok + end, + ?assertMatch([done, done, done], Res), + ok + end, + [] + ), + ok.