Support the 'clean_start = false' websocket connection
This commit is contained in:
parent
2ef52828bc
commit
ad9e0fc311
|
@ -305,20 +305,17 @@ websocket_info({timeout, TRef, limit_timeout},
|
||||||
websocket_info({timeout, TRef, Msg}, State) when is_reference(TRef) ->
|
websocket_info({timeout, TRef, Msg}, State) when is_reference(TRef) ->
|
||||||
handle_timeout(TRef, Msg, State);
|
handle_timeout(TRef, Msg, State);
|
||||||
|
|
||||||
websocket_info(Close = {close, _Reason}, State) ->
|
|
||||||
handle_info(Close, State);
|
|
||||||
|
|
||||||
websocket_info({shutdown, Reason}, State) ->
|
websocket_info({shutdown, Reason}, State) ->
|
||||||
shutdown(Reason, State);
|
shutdown(Reason, State);
|
||||||
|
|
||||||
websocket_info({stop, Reason}, State) ->
|
websocket_info({stop, Reason}, State) ->
|
||||||
stop(Reason, State);
|
shutdown(Reason, State);
|
||||||
|
|
||||||
websocket_info(Info, State) ->
|
websocket_info(Info, State) ->
|
||||||
handle_info(Info, State).
|
handle_info(Info, State).
|
||||||
|
|
||||||
websocket_close(Reason, State) ->
|
websocket_close(Reason, State) ->
|
||||||
?LOG(debug, "WebSocket closed due to ~p~n", [Reason]),
|
?LOG(debug, "Websocket closed due to ~p~n", [Reason]),
|
||||||
handle_info({sock_closed, Reason}, State).
|
handle_info({sock_closed, Reason}, State).
|
||||||
|
|
||||||
terminate(Error, _Req, #state{channel = Channel, stop_reason = Reason}) ->
|
terminate(Error, _Req, #state{channel = Channel, stop_reason = Reason}) ->
|
||||||
|
@ -359,8 +356,8 @@ handle_info({connack, ConnAck}, State) ->
|
||||||
return(enqueue(ConnAck, State));
|
return(enqueue(ConnAck, State));
|
||||||
|
|
||||||
handle_info({close, Reason}, State) ->
|
handle_info({close, Reason}, State) ->
|
||||||
%% TODO: close ws conn?
|
?LOG(debug, "Force to close the socket due to ~p", [Reason]),
|
||||||
shutdown(Reason, State);
|
return(enqueue(close, State));
|
||||||
|
|
||||||
handle_info({event, connected}, State = #state{channel = Channel}) ->
|
handle_info({event, connected}, State = #state{channel = Channel}) ->
|
||||||
ClientId = emqx_channel:info(clientid, Channel),
|
ClientId = emqx_channel:info(clientid, Channel),
|
||||||
|
@ -590,7 +587,7 @@ ensure_stats_timer(State = #state{idle_timeout = Timeout,
|
||||||
State#state{stats_timer = start_timer(Timeout, emit_stats)};
|
State#state{stats_timer = start_timer(Timeout, emit_stats)};
|
||||||
ensure_stats_timer(State) -> State.
|
ensure_stats_timer(State) -> State.
|
||||||
|
|
||||||
-compile({inline, [enqueue/2, return/1, shutdown/2, stop/2]}).
|
-compile({inline, [postpone/2, enqueue/2, return/1, shutdown/2]}).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Postpone the packet, cmd or event
|
%% Postpone the packet, cmd or event
|
||||||
|
@ -612,6 +609,10 @@ enqueue(Packets, State = #state{postponed = Postponed})
|
||||||
enqueue(Other, State = #state{postponed = Postponed}) ->
|
enqueue(Other, State = #state{postponed = Postponed}) ->
|
||||||
State#state{postponed = [Other|Postponed]}.
|
State#state{postponed = [Other|Postponed]}.
|
||||||
|
|
||||||
|
shutdown(Reason, State = #state{postponed = Postponed}) ->
|
||||||
|
Postponed1 = [{shutdown, Reason}|Postponed],
|
||||||
|
return(State#state{postponed = Postponed1, stop_reason = Reason}).
|
||||||
|
|
||||||
return(State = #state{postponed = []}) ->
|
return(State = #state{postponed = []}) ->
|
||||||
{ok, State};
|
{ok, State};
|
||||||
return(State = #state{postponed = Postponed}) ->
|
return(State = #state{postponed = Postponed}) ->
|
||||||
|
@ -620,10 +621,10 @@ return(State = #state{postponed = Postponed}) ->
|
||||||
State1 = State#state{postponed = []},
|
State1 = State#state{postponed = []},
|
||||||
case {Packets, Cmds} of
|
case {Packets, Cmds} of
|
||||||
{[], []} -> {ok, State1};
|
{[], []} -> {ok, State1};
|
||||||
{[], Cmds} -> {reply, Cmds, State1};
|
{[], Cmds} -> {Cmds, State1};
|
||||||
{Packets, Cmds} ->
|
{Packets, Cmds} ->
|
||||||
{Reply, State2} = handle_outgoing(Packets, State1),
|
{Frame, State2} = handle_outgoing(Packets, State1),
|
||||||
{reply, [Reply|Cmds], State2}
|
{[Frame|Cmds], State2}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
classify([], Packets, Cmds, Events) ->
|
classify([], Packets, Cmds, Events) ->
|
||||||
|
@ -633,6 +634,8 @@ classify([Packet|More], Packets, Cmds, Events)
|
||||||
classify(More, [Packet|Packets], Cmds, Events);
|
classify(More, [Packet|Packets], Cmds, Events);
|
||||||
classify([Cmd = {active, _}|More], Packets, Cmds, Events) ->
|
classify([Cmd = {active, _}|More], Packets, Cmds, Events) ->
|
||||||
classify(More, Packets, [Cmd|Cmds], Events);
|
classify(More, Packets, [Cmd|Cmds], Events);
|
||||||
|
classify([Cmd = {shutdown, _Reason}|More], Packets, Cmds, Events) ->
|
||||||
|
classify(More, Packets, [Cmd|Cmds], Events);
|
||||||
classify([Cmd = close|More], Packets, Cmds, Events) ->
|
classify([Cmd = close|More], Packets, Cmds, Events) ->
|
||||||
classify(More, Packets, [Cmd|Cmds], Events);
|
classify(More, Packets, [Cmd|Cmds], Events);
|
||||||
classify([Event|More], Packets, Cmds, Events) ->
|
classify([Event|More], Packets, Cmds, Events) ->
|
||||||
|
@ -640,15 +643,6 @@ classify([Event|More], Packets, Cmds, Events) ->
|
||||||
|
|
||||||
trigger(Event) -> erlang:send(self(), Event).
|
trigger(Event) -> erlang:send(self(), Event).
|
||||||
|
|
||||||
shutdown(Reason, State) ->
|
|
||||||
stop({shutdown, Reason}, State).
|
|
||||||
|
|
||||||
stop(Reason, State = #state{postponed = []}) ->
|
|
||||||
{stop, State#state{stop_reason = Reason}};
|
|
||||||
stop(Reason, State = #state{postponed = Postponed}) ->
|
|
||||||
return(State#state{postponed = [close|Postponed],
|
|
||||||
stop_reason = Reason}).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% For CT tests
|
%% For CT tests
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
Loading…
Reference in New Issue