diff --git a/apps/emqx/src/emqx_pmon.erl b/apps/emqx/src/emqx_pmon.erl index 99395d923..426b76da1 100644 --- a/apps/emqx/src/emqx_pmon.erl +++ b/apps/emqx/src/emqx_pmon.erl @@ -16,13 +16,14 @@ -module(emqx_pmon). --compile({no_auto_import, [monitor/3]}). +-compile({no_auto_import, [monitor/3, demonitor/1, demonitor/2]}). -export([new/0]). -export([ monitor/2, monitor/3, + demonitor/1, demonitor/2 ]). @@ -65,13 +66,30 @@ monitor(Pid, Val, PMon = ?PMON(Map)) -> demonitor(Pid, PMon = ?PMON(Map)) -> case maps:find(Pid, Map) of {ok, {Ref, _Val}} -> - %% flush - _ = erlang:demonitor(Ref, [flush]), + ok = demonitor(Ref), ?PMON(maps:remove(Pid, Map)); error -> PMon end. +%% @doc Improved version of erlang:demonitor(Ref, [flush]). +%% Only try to receive the 'DOWN' messages when it might have been sent. +-spec demonitor(reference()) -> ok. +demonitor(Ref) when is_reference(Ref) -> + case erlang:demonitor(Ref, [info]) of + true -> + %% succeeded + ok; + _ -> + %% '_', but not 'false' because this may change in the future according to OTP doc + receive + {'DOWN', Ref, process, _, _} -> + ok + after 0 -> + ok + end + end. + -spec find(pid(), pmon()) -> error | {ok, term()}. find(Pid, ?PMON(Map)) -> case maps:find(Pid, Map) of diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index 3d1d0d439..e8ae58f79 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -191,7 +191,7 @@ dispatch_with_ack(SubPid, Topic, Msg) -> {error, timeout} end after - _ = erlang:demonitor(Ref, [flush]) + ok = emqx_pmon:demonitor(Ref) end. with_ack_ref(Msg, SenderRef) -> diff --git a/apps/emqx/src/emqx_ws_connection.erl b/apps/emqx/src/emqx_ws_connection.erl index 4b964b9a1..47519bb45 100644 --- a/apps/emqx/src/emqx_ws_connection.erl +++ b/apps/emqx/src/emqx_ws_connection.erl @@ -185,12 +185,12 @@ call(WsPid, Req, Timeout) when is_pid(WsPid) -> WsPid ! {call, {self(), Mref}, Req}, receive {Mref, Reply} -> - erlang:demonitor(Mref, [flush]), + ok = emqx_pmon:demonitor(Mref), Reply; {'DOWN', Mref, _, _, Reason} -> exit(Reason) after Timeout -> - erlang:demonitor(Mref, [flush]), + ok = emqx_pmon:demonitor(Mref), exit(timeout) end. diff --git a/apps/emqx/test/emqx_pmon_SUITE.erl b/apps/emqx/test/emqx_pmon_SUITE.erl index a47542bf7..9ab0e6231 100644 --- a/apps/emqx/test/emqx_pmon_SUITE.erl +++ b/apps/emqx/test/emqx_pmon_SUITE.erl @@ -60,5 +60,33 @@ t_erase(_) -> ?assertEqual([{self(), val}], Items), ?assertEqual(0, emqx_pmon:count(PMon3)). -% t_erase_all(_) -> -% error('TODO'). +t_demonitor(_) -> + Pid = self(), + Ref1 = erlang:monitor(process, Pid), + Ref2 = erlang:monitor(process, spawn(fun() -> ok end)), + Ref3 = erlang:make_ref(), + ok = emqx_pmon:demonitor(Ref1), + ?assertNot(erlang:demonitor(Ref1, [info])), + ok = emqx_pmon:demonitor(Ref2), + % demonitor twice + ok = emqx_pmon:demonitor(Ref2), + ?assertNot(erlang:demonitor(Ref2, [info])), + % not a monitor ref, should return ok + ok = emqx_pmon:demonitor(Ref3), + ?assertNot(erlang:demonitor(Ref3, [info])), + Pid2 = spawn(fun() -> + receive + stop -> + exit(normal) + end + end), + Ref4 = erlang:monitor(process, Pid2), + Ref5 = erlang:monitor(process, Pid2), + ok = emqx_pmon:demonitor(Ref4), + ?assertNot(erlang:demonitor(Ref4, [info])), + _ = erlang:send(Pid2, stop), + receive + {'DOWN', Ref, process, Pid2, normal} -> + ?assertEqual(Ref, Ref5) + end, + ok.