From ad9e0fc311478c8d3f9285dc27e8aab008fc6385 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 15 Dec 2019 18:58:29 +0800 Subject: [PATCH] Support the 'clean_start = false' websocket connection --- src/emqx_ws_connection.erl | 34 ++++++++++++++-------------------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index a67d5717c..77c517aeb 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -305,20 +305,17 @@ websocket_info({timeout, TRef, limit_timeout}, websocket_info({timeout, TRef, Msg}, State) when is_reference(TRef) -> handle_timeout(TRef, Msg, State); -websocket_info(Close = {close, _Reason}, State) -> - handle_info(Close, State); - websocket_info({shutdown, Reason}, State) -> shutdown(Reason, State); websocket_info({stop, Reason}, State) -> - stop(Reason, State); + shutdown(Reason, State); websocket_info(Info, State) -> handle_info(Info, State). websocket_close(Reason, State) -> - ?LOG(debug, "WebSocket closed due to ~p~n", [Reason]), + ?LOG(debug, "Websocket closed due to ~p~n", [Reason]), handle_info({sock_closed, Reason}, State). terminate(Error, _Req, #state{channel = Channel, stop_reason = Reason}) -> @@ -359,8 +356,8 @@ handle_info({connack, ConnAck}, State) -> return(enqueue(ConnAck, State)); handle_info({close, Reason}, State) -> - %% TODO: close ws conn? - shutdown(Reason, State); + ?LOG(debug, "Force to close the socket due to ~p", [Reason]), + return(enqueue(close, State)); handle_info({event, connected}, State = #state{channel = Channel}) -> ClientId = emqx_channel:info(clientid, Channel), @@ -590,7 +587,7 @@ ensure_stats_timer(State = #state{idle_timeout = Timeout, State#state{stats_timer = start_timer(Timeout, emit_stats)}; ensure_stats_timer(State) -> State. --compile({inline, [enqueue/2, return/1, shutdown/2, stop/2]}). +-compile({inline, [postpone/2, enqueue/2, return/1, shutdown/2]}). %%-------------------------------------------------------------------- %% Postpone the packet, cmd or event @@ -612,6 +609,10 @@ enqueue(Packets, State = #state{postponed = Postponed}) enqueue(Other, State = #state{postponed = Postponed}) -> State#state{postponed = [Other|Postponed]}. +shutdown(Reason, State = #state{postponed = Postponed}) -> + Postponed1 = [{shutdown, Reason}|Postponed], + return(State#state{postponed = Postponed1, stop_reason = Reason}). + return(State = #state{postponed = []}) -> {ok, State}; return(State = #state{postponed = Postponed}) -> @@ -620,10 +621,10 @@ return(State = #state{postponed = Postponed}) -> State1 = State#state{postponed = []}, case {Packets, Cmds} of {[], []} -> {ok, State1}; - {[], Cmds} -> {reply, Cmds, State1}; + {[], Cmds} -> {Cmds, State1}; {Packets, Cmds} -> - {Reply, State2} = handle_outgoing(Packets, State1), - {reply, [Reply|Cmds], State2} + {Frame, State2} = handle_outgoing(Packets, State1), + {[Frame|Cmds], State2} end. classify([], Packets, Cmds, Events) -> @@ -633,6 +634,8 @@ classify([Packet|More], Packets, Cmds, Events) classify(More, [Packet|Packets], Cmds, Events); classify([Cmd = {active, _}|More], Packets, Cmds, Events) -> classify(More, Packets, [Cmd|Cmds], Events); +classify([Cmd = {shutdown, _Reason}|More], Packets, Cmds, Events) -> + classify(More, Packets, [Cmd|Cmds], Events); classify([Cmd = close|More], Packets, Cmds, Events) -> classify(More, Packets, [Cmd|Cmds], Events); classify([Event|More], Packets, Cmds, Events) -> @@ -640,15 +643,6 @@ classify([Event|More], Packets, Cmds, Events) -> trigger(Event) -> erlang:send(self(), Event). -shutdown(Reason, State) -> - stop({shutdown, Reason}, State). - -stop(Reason, State = #state{postponed = []}) -> - {stop, State#state{stop_reason = Reason}}; -stop(Reason, State = #state{postponed = Postponed}) -> - return(State#state{postponed = [close|Postponed], - stop_reason = Reason}). - %%-------------------------------------------------------------------- %% For CT tests %%--------------------------------------------------------------------