From 50e7d5d2ecfa50b244c5b3ec865ffaa34a50cf82 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 16 Jun 2023 17:03:39 -0300 Subject: [PATCH] fix(nolink_apply): avoid sending late replies to caller Due to race conditions, it's possible that the caller to `pmap`/`nolink_apply` might receive a late reply. e.g. when a timeout occurred while resource manager was checking a resource's health: ``` 19:18:23.084 [error] [data: ..., event_data: {#Reference<0.3247872820.3887857670.131018>, {:normal, [false, true, true, true, true, true]}}, event_type: :info, msg: :ignore_all_other_events, state: :connected] ``` Using an alias and also checking for the race condition in the `after` block (like [`gen`](https://github.com/erlang/otp/blob/a76bf63197dbf41d9179413b26597afeeb46ff30/lib/stdlib/src/gen.erl#L270-L277) does), we avoid polluting the caller's mailbox with late replies. --- apps/emqx_utils/src/emqx_utils.erl | 24 ++++++++++++++--- apps/emqx_utils/test/emqx_utils_SUITE.erl | 32 +++++++++++++++++++++++ 2 files changed, 52 insertions(+), 4 deletions(-) diff --git a/apps/emqx_utils/src/emqx_utils.erl b/apps/emqx_utils/src/emqx_utils.erl index 6cf85fb5d..830845b60 100644 --- a/apps/emqx_utils/src/emqx_utils.erl +++ b/apps/emqx_utils/src/emqx_utils.erl @@ -20,6 +20,8 @@ %% [TODO] Cleanup so the instruction below is not necessary. -elvis([{elvis_style, god_modules, disable}]). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + -export([ merge_opts/2, maybe_apply/2, @@ -432,7 +434,7 @@ nolink_apply(Fun) -> nolink_apply(Fun, infinity). -spec nolink_apply(function(), timer:timeout()) -> term(). nolink_apply(Fun, Timeout) when is_function(Fun, 0) -> Caller = self(), - ResRef = make_ref(), + ResRef = alias([reply]), Middleman = erlang:spawn( fun() -> process_flag(trap_exit, true), @@ -446,7 +448,8 @@ nolink_apply(Fun, Timeout) when is_function(Fun, 0) -> C:E:S -> {exception, {C, E, S}} end, - _ = erlang:send(Caller, {ResRef, Res}), + _ = erlang:send(ResRef, {ResRef, Res}), + ?tp(pmap_middleman_sent_response, #{}), exit(normal) end ), @@ -460,7 +463,7 @@ nolink_apply(Fun, Timeout) when is_function(Fun, 0) -> exit(normal); {'EXIT', Worker, Reason} -> %% worker exited with some reason other than 'normal' - _ = erlang:send(Caller, {ResRef, {'EXIT', Reason}}), + _ = erlang:send(ResRef, {ResRef, {'EXIT', Reason}}), exit(normal) end end @@ -473,8 +476,21 @@ nolink_apply(Fun, Timeout) when is_function(Fun, 0) -> {ResRef, {'EXIT', Reason}} -> exit(Reason) after Timeout -> + %% possible race condition: a message was received just as we enter the after + %% block. + ?tp(pmap_timeout, #{}), + unalias(ResRef), exit(Middleman, kill), - exit(timeout) + receive + {ResRef, {normal, Result}} -> + Result; + {ResRef, {exception, {C, E, S}}} -> + erlang:raise(C, E, S); + {ResRef, {'EXIT', Reason}} -> + exit(Reason) + after 0 -> + exit(timeout) + end end. safe_to_existing_atom(In) -> diff --git a/apps/emqx_utils/test/emqx_utils_SUITE.erl b/apps/emqx_utils/test/emqx_utils_SUITE.erl index 6c6bcf8d3..12e99c917 100644 --- a/apps/emqx_utils/test/emqx_utils_SUITE.erl +++ b/apps/emqx_utils/test/emqx_utils_SUITE.erl @@ -20,6 +20,7 @@ -compile(nowarn_export_all). -include_lib("eunit/include/eunit.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(SOCKOPTS, [ binary, @@ -208,3 +209,34 @@ t_pmap_exception(_) -> [{2, 3}, {3, 4}, error] ) ). + +t_pmap_late_reply(_) -> + ?check_trace( + begin + ?force_ordering( + #{?snk_kind := pmap_middleman_sent_response}, + #{?snk_kind := pmap_timeout} + ), + Timeout = 100, + Res = + catch emqx_utils:pmap( + fun(_) -> + process_flag(trap_exit, true), + timer:sleep(3 * Timeout), + done + end, + [1, 2, 3], + Timeout + ), + receive + {Ref, LateReply} when is_reference(Ref) -> + ct:fail("should not receive late reply: ~p", [LateReply]) + after (5 * Timeout) -> + ok + end, + ?assertMatch([done, done, done], Res), + ok + end, + [] + ), + ok.