diff --git a/apps/emqx/include/emqx_mqtt.hrl b/apps/emqx/include/emqx_mqtt.hrl index 6c546c789..537e5a85a 100644 --- a/apps/emqx/include/emqx_mqtt.hrl +++ b/apps/emqx/include/emqx_mqtt.hrl @@ -662,9 +662,9 @@ end). end ). --define(FRAME_PARSE_ERROR(Reason), {frame_parse_error, Reason}). --define(FRAME_SERIALIZE_ERROR(Reason), {frame_serialize_error, Reason}). --define(THROW_FRAME_ERROR(Reason), erlang:throw(?FRAME_PARSE_ERROR(Reason))). --define(THROW_SERIALIZE_ERROR(Reason), erlang:throw(?FRAME_SERIALIZE_ERROR(Reason))). +-define(FRAME_PARSE_ERROR, frame_parse_error). +-define(FRAME_SERIALIZE_ERROR, frame_serialize_error). +-define(THROW_FRAME_ERROR(Reason), erlang:throw({?FRAME_PARSE_ERROR, Reason})). +-define(THROW_SERIALIZE_ERROR(Reason), erlang:throw({?FRAME_SERIALIZE_ERROR, Reason})). -endif. diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 7d11d07aa..4d0141dd5 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -765,7 +765,7 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) -> NState = State#state{parse_state = NParseState}, parse_incoming(Rest, [Packet | Packets], NState) catch - throw:?FRAME_PARSE_ERROR(Reason) -> + throw:{?FRAME_PARSE_ERROR, Reason} -> ?SLOG(info, #{ reason => Reason, at_state => emqx_frame:describe_state(ParseState), @@ -840,19 +840,19 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> Data catch %% Maybe Never happen. - throw:?FRAME_SERIALIZE_ERROR(Reason) -> + throw:{?FRAME_SERIALIZE_ERROR, Reason} -> ?SLOG(info, #{ reason => Reason, input_packet => Packet }), - erlang:error(?FRAME_SERIALIZE_ERROR(Reason)); + erlang:error({?FRAME_SERIALIZE_ERROR, Reason}); error:Reason:Stacktrace -> ?SLOG(error, #{ input_packet => Packet, exception => Reason, stacktrace => Stacktrace }), - erlang:error(frame_serialize_error) + erlang:error(?FRAME_SERIALIZE_ERROR) end end. diff --git a/apps/emqx/src/emqx_pmon.erl b/apps/emqx/src/emqx_pmon.erl index 99395d923..426b76da1 100644 --- a/apps/emqx/src/emqx_pmon.erl +++ b/apps/emqx/src/emqx_pmon.erl @@ -16,13 +16,14 @@ -module(emqx_pmon). --compile({no_auto_import, [monitor/3]}). +-compile({no_auto_import, [monitor/3, demonitor/1, demonitor/2]}). -export([new/0]). -export([ monitor/2, monitor/3, + demonitor/1, demonitor/2 ]). @@ -65,13 +66,30 @@ monitor(Pid, Val, PMon = ?PMON(Map)) -> demonitor(Pid, PMon = ?PMON(Map)) -> case maps:find(Pid, Map) of {ok, {Ref, _Val}} -> - %% flush - _ = erlang:demonitor(Ref, [flush]), + ok = demonitor(Ref), ?PMON(maps:remove(Pid, Map)); error -> PMon end. +%% @doc Improved version of erlang:demonitor(Ref, [flush]). +%% Only try to receive the 'DOWN' messages when it might have been sent. +-spec demonitor(reference()) -> ok. +demonitor(Ref) when is_reference(Ref) -> + case erlang:demonitor(Ref, [info]) of + true -> + %% succeeded + ok; + _ -> + %% '_', but not 'false' because this may change in the future according to OTP doc + receive + {'DOWN', Ref, process, _, _} -> + ok + after 0 -> + ok + end + end. + -spec find(pid(), pmon()) -> error | {ok, term()}. find(Pid, ?PMON(Map)) -> case maps:find(Pid, Map) of diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index 3d1d0d439..e8ae58f79 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -191,7 +191,7 @@ dispatch_with_ack(SubPid, Topic, Msg) -> {error, timeout} end after - _ = erlang:demonitor(Ref, [flush]) + ok = emqx_pmon:demonitor(Ref) end. with_ack_ref(Msg, SenderRef) -> diff --git a/apps/emqx/src/emqx_ws_connection.erl b/apps/emqx/src/emqx_ws_connection.erl index 4b964b9a1..37bd8fdcf 100644 --- a/apps/emqx/src/emqx_ws_connection.erl +++ b/apps/emqx/src/emqx_ws_connection.erl @@ -185,12 +185,12 @@ call(WsPid, Req, Timeout) when is_pid(WsPid) -> WsPid ! {call, {self(), Mref}, Req}, receive {Mref, Reply} -> - erlang:demonitor(Mref, [flush]), + ok = emqx_pmon:demonitor(Mref), Reply; {'DOWN', Mref, _, _, Reason} -> exit(Reason) after Timeout -> - erlang:demonitor(Mref, [flush]), + ok = emqx_pmon:demonitor(Mref), exit(timeout) end. @@ -676,7 +676,7 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) -> NState = State#state{parse_state = NParseState}, parse_incoming(Rest, [{incoming, Packet} | Packets], NState) catch - throw:?FRAME_PARSE_ERROR(Reason) -> + throw:{?FRAME_PARSE_ERROR, Reason} -> ?SLOG(info, #{ reason => Reason, at_state => emqx_frame:describe_state(ParseState), @@ -791,19 +791,19 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> Data catch %% Maybe Never happen. - throw:?FRAME_SERIALIZE_ERROR(Reason) -> + throw:{?FRAME_SERIALIZE_ERROR, Reason} -> ?SLOG(info, #{ reason => Reason, input_packet => Packet }), - erlang:error(?FRAME_SERIALIZE_ERROR(Reason)); + erlang:error({?FRAME_SERIALIZE_ERROR, Reason}); error:Reason:Stacktrace -> ?SLOG(error, #{ input_packet => Packet, exception => Reason, stacktrace => Stacktrace }), - erlang:error(frame_serialize_error) + erlang:error(?FRAME_SERIALIZE_ERROR) end end. diff --git a/apps/emqx/test/emqx_frame_SUITE.erl b/apps/emqx/test/emqx_frame_SUITE.erl index 488cf89f6..99f815abf 100644 --- a/apps/emqx/test/emqx_frame_SUITE.erl +++ b/apps/emqx/test/emqx_frame_SUITE.erl @@ -24,7 +24,7 @@ -include_lib("common_test/include/ct.hrl"). -define(ASSERT_FRAME_THROW(Reason, Expr), - ?assertThrow(?FRAME_PARSE_ERROR(Reason), Expr) + ?assertThrow({?FRAME_PARSE_ERROR, Reason}, Expr) ). all() -> diff --git a/apps/emqx/test/emqx_pmon_SUITE.erl b/apps/emqx/test/emqx_pmon_SUITE.erl index a47542bf7..9ab0e6231 100644 --- a/apps/emqx/test/emqx_pmon_SUITE.erl +++ b/apps/emqx/test/emqx_pmon_SUITE.erl @@ -60,5 +60,33 @@ t_erase(_) -> ?assertEqual([{self(), val}], Items), ?assertEqual(0, emqx_pmon:count(PMon3)). -% t_erase_all(_) -> -% error('TODO'). +t_demonitor(_) -> + Pid = self(), + Ref1 = erlang:monitor(process, Pid), + Ref2 = erlang:monitor(process, spawn(fun() -> ok end)), + Ref3 = erlang:make_ref(), + ok = emqx_pmon:demonitor(Ref1), + ?assertNot(erlang:demonitor(Ref1, [info])), + ok = emqx_pmon:demonitor(Ref2), + % demonitor twice + ok = emqx_pmon:demonitor(Ref2), + ?assertNot(erlang:demonitor(Ref2, [info])), + % not a monitor ref, should return ok + ok = emqx_pmon:demonitor(Ref3), + ?assertNot(erlang:demonitor(Ref3, [info])), + Pid2 = spawn(fun() -> + receive + stop -> + exit(normal) + end + end), + Ref4 = erlang:monitor(process, Pid2), + Ref5 = erlang:monitor(process, Pid2), + ok = emqx_pmon:demonitor(Ref4), + ?assertNot(erlang:demonitor(Ref4, [info])), + _ = erlang:send(Pid2, stop), + receive + {'DOWN', Ref, process, Pid2, normal} -> + ?assertEqual(Ref, Ref5) + end, + ok.