diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 41a343002..69ffddf06 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -56,7 +56,7 @@ ]). %% Internal callback --export([wakeup_from_hib/2]). +-export([wakeup_from_hib/2, recvloop/2]). %% Export for CT -export([set_field/3]). @@ -284,15 +284,22 @@ recvloop(Parent, State = #state{idle_timeout = IdleTimeout}) -> handle_recv({system, From, Request}, Parent, State) -> sys:handle_system_msg(Request, From, Parent, ?MODULE, [], State); handle_recv({'EXIT', Parent, Reason}, Parent, State) -> + %% FIXME: it's not trapping exit, should never receive an EXIT terminate(Reason, State); handle_recv(Msg, Parent, State = #state{idle_timeout = IdleTimeout}) -> - process_msg([Msg], Parent, ensure_stats_timer(IdleTimeout, State)). + case process_msg([Msg], ensure_stats_timer(IdleTimeout, State)) of + {ok, NewState} -> + ?MODULE:recvloop(Parent, NewState); + {stop, Reason, NewSate} -> + terminate(Reason, NewSate) + end. hibernate(Parent, State) -> proc_lib:hibernate(?MODULE, wakeup_from_hib, [Parent, State]). %% Maybe do something here later. -wakeup_from_hib(Parent, State) -> recvloop(Parent, State). +wakeup_from_hib(Parent, State) -> + ?MODULE:recvloop(Parent, State). %%-------------------------------------------------------------------- %% Ensure/cancel stats timer @@ -311,22 +318,31 @@ cancel_stats_timer(State) -> State. %%-------------------------------------------------------------------- %% Process next Msg -process_msg([], Parent, State) -> recvloop(Parent, State); - -process_msg([Msg|More], Parent, State) -> - case catch handle_msg(Msg, State) of - ok -> - process_msg(More, Parent, State); - {ok, NState} -> - process_msg(More, Parent, NState); - {ok, Msgs, NState} -> - process_msg(append_msg(More, Msgs), Parent, NState); - {stop, Reason} -> - terminate(Reason, State); - {stop, Reason, NState} -> - terminate(Reason, NState); - {'EXIT', Reason} -> - terminate(Reason, State) +process_msg([], State) -> + {ok, State}; +process_msg([Msg|More], State) -> + try + case handle_msg(Msg, State) of + ok -> + process_msg(More, State); + {ok, NState} -> + process_msg(More, NState); + {ok, Msgs, NState} -> + process_msg(append_msg(More, Msgs), NState); + {stop, Reason, NState} -> + {stop, Reason, NState} + end + catch + exit : normal -> + {stop, normal, State}; + exit : shutdown -> + {stop, shutdown, State}; + exit : {shutdown, _} = Shutdown -> + {stop, Shutdown, State}; + Exception : Context : Stack -> + {stop, #{exception => Exception, + context => Context, + stacktrace => Stack}, State} end. -compile({inline, [append_msg/2]}). @@ -450,18 +466,37 @@ handle_msg(Msg, State) -> -spec terminate(any(), state()) -> no_return(). terminate(Reason, State = #state{channel = Channel, transport = Transport, socket = Socket}) -> - ?tp(debug, terminate, #{reason => Reason}), - Channel1 = emqx_channel:set_conn_state(disconnected, Channel), - emqx_congestion:cancel_alarms(Socket, Transport, Channel1), - emqx_channel:terminate(Reason, Channel1), + try + Channel1 = emqx_channel:set_conn_state(disconnected, Channel), + emqx_congestion:cancel_alarms(Socket, Transport, Channel1), + emqx_channel:terminate(Reason, Channel1), + close_socket_ok(State) + catch + E : C : S -> + ?tp(warning, unclean_terminate, #{exception => E, context => C, stacktrace => S}) + end, + ?tp(debug, terminate, #{}), + maybe_raise_excption(Reason). + +%% close socket, discard new state, always return ok. +close_socket_ok(State) -> _ = close_socket(State), + ok. + +%% tell truth about the original exception +maybe_raise_excption(#{exception := Exception, + context := Context, + stacktrace := Stacktrace + }) -> + erlang:raise(Exception, Context, Stacktrace); +maybe_raise_excption(Reason) -> exit(Reason). %%-------------------------------------------------------------------- %% Sys callbacks system_continue(Parent, _Debug, State) -> - recvloop(Parent, State). + ?MODULE:recvloop(Parent, State). system_terminate(Reason, _Parent, _Debug, State) -> terminate(Reason, State).