fix issue #561 - websocket client's process may leak when no mqtt packets received
This commit is contained in:
parent
02a17a8e96
commit
686574913c
|
@ -52,13 +52,14 @@ handle_request('POST', "/mqtt/publish", Req) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% MQTT Over WebSocket
|
%% MQTT Over WebSocket
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
handle_request('GET', "/mqtt", Req) ->
|
handle_request('GET', "/mqtt", Req) ->
|
||||||
lager:info("WebSocket Connection from: ~s", [Req:get(peer)]),
|
lager:info("WebSocket Connection from: ~s", [Req:get(peer)]),
|
||||||
Upgrade = Req:get_header_value("Upgrade"),
|
Upgrade = Req:get_header_value("Upgrade"),
|
||||||
Proto = Req:get_header_value("Sec-WebSocket-Protocol"),
|
Proto = Req:get_header_value("Sec-WebSocket-Protocol"),
|
||||||
case {is_websocket(Upgrade), Proto} of
|
case {is_websocket(Upgrade), Proto} of
|
||||||
{true, "mqtt" ++ _Vsn} ->
|
{true, "mqtt" ++ _Vsn} ->
|
||||||
emqttd_ws_client:start_link(Req);
|
emqttd_ws:handle_request(Req);
|
||||||
{false, _} ->
|
{false, _} ->
|
||||||
lager:error("Not WebSocket: Upgrade = ~s", [Upgrade]),
|
lager:error("Not WebSocket: Upgrade = ~s", [Upgrade]),
|
||||||
Req:respond({400, [], <<"Bad Request">>});
|
Req:respond({400, [], <<"Bad Request">>});
|
||||||
|
@ -144,7 +145,6 @@ authorized(Req) ->
|
||||||
user_passwd(BasicAuth) ->
|
user_passwd(BasicAuth) ->
|
||||||
list_to_tuple(binary:split(base64:decode(BasicAuth), <<":">>)).
|
list_to_tuple(binary:split(base64:decode(BasicAuth), <<":">>)).
|
||||||
|
|
||||||
|
|
||||||
int(S) -> list_to_integer(S).
|
int(S) -> list_to_integer(S).
|
||||||
|
|
||||||
bool("0") -> false;
|
bool("0") -> false;
|
||||||
|
|
|
@ -36,8 +36,7 @@ new(Opts) ->
|
||||||
fun(Bin) -> parse(Bin, {none, limit(Opts)}) end.
|
fun(Bin) -> parse(Bin, {none, limit(Opts)}) end.
|
||||||
|
|
||||||
limit(Opts) ->
|
limit(Opts) ->
|
||||||
#mqtt_packet_limit{max_packet_size =
|
#mqtt_packet_limit{max_packet_size = proplists:get_value(max_packet_size, Opts, ?MAX_LEN)}.
|
||||||
proplists:get_value(max_packet_size, Opts, ?MAX_LEN)}.
|
|
||||||
|
|
||||||
%% @doc Parse MQTT Packet
|
%% @doc Parse MQTT Packet
|
||||||
-spec(parse(binary(), {none, [option()]} | fun())
|
-spec(parse(binary(), {none, [option()]} | fun())
|
||||||
|
|
|
@ -0,0 +1,75 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqttd_ws).
|
||||||
|
|
||||||
|
-export([handle_request/1, ws_loop/3]).
|
||||||
|
|
||||||
|
%% WebSocket Loop State
|
||||||
|
-record(wsocket_state, {peer, client_pid, packet_opts, parser_fun}).
|
||||||
|
|
||||||
|
-define(LOG(Level, Peer, Format, Args),
|
||||||
|
lager:Level("WsClient(~s): " ++ Format, [Peer | Args])).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Handle WebSocket Request
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% @doc Handle WebSocket Request.
|
||||||
|
handle_request(Req) ->
|
||||||
|
Peer = Req:get(peer),
|
||||||
|
PktOpts = emqttd:env(mqtt, packet),
|
||||||
|
ParserFun = emqttd_parser:new(PktOpts),
|
||||||
|
{ReentryWs, ReplyChannel} = upgrade(Req),
|
||||||
|
{ok, ClientPid} = emqttd_ws_client_sup:start_client(self(), Req, ReplyChannel),
|
||||||
|
ReentryWs(#wsocket_state{peer = Peer, client_pid = ClientPid,
|
||||||
|
packet_opts = PktOpts, parser_fun = ParserFun}).
|
||||||
|
|
||||||
|
%% @doc Upgrade WebSocket.
|
||||||
|
%% @private
|
||||||
|
upgrade(Req) ->
|
||||||
|
mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Receive Loop
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% @doc WebSocket frame receive loop.
|
||||||
|
ws_loop(<<>>, State, _ReplyChannel) ->
|
||||||
|
State;
|
||||||
|
ws_loop([<<>>], State, _ReplyChannel) ->
|
||||||
|
State;
|
||||||
|
ws_loop(Data, State = #wsocket_state{peer = Peer, client_pid = ClientPid,
|
||||||
|
parser_fun = ParserFun}, ReplyChannel) ->
|
||||||
|
?LOG(debug, Peer, "RECV ~p", [Data]),
|
||||||
|
case catch ParserFun(iolist_to_binary(Data)) of
|
||||||
|
{more, NewParser} ->
|
||||||
|
State#wsocket_state{parser_fun = NewParser};
|
||||||
|
{ok, Packet, Rest} ->
|
||||||
|
gen_server:cast(ClientPid, {received, Packet}),
|
||||||
|
ws_loop(Rest, reset_parser(State), ReplyChannel);
|
||||||
|
{error, Error} ->
|
||||||
|
?LOG(error, Peer, "Frame error: ~p", [Error]),
|
||||||
|
exit({shutdown, Error});
|
||||||
|
{'EXIT', Reason} ->
|
||||||
|
?LOG(error, Peer, "Frame error: ~p", [Reason]),
|
||||||
|
?LOG(error, Peer, "Error data: ~p", [Data]),
|
||||||
|
exit({shutdown, parser_error})
|
||||||
|
end.
|
||||||
|
|
||||||
|
reset_parser(State = #wsocket_state{packet_opts = PktOpts}) ->
|
||||||
|
State#wsocket_state{parser_fun = emqttd_parser:new(PktOpts)}.
|
||||||
|
|
|
@ -16,42 +16,31 @@
|
||||||
|
|
||||||
-module(emqttd_ws_client).
|
-module(emqttd_ws_client).
|
||||||
|
|
||||||
|
-behaviour(gen_server).
|
||||||
|
|
||||||
-include("emqttd.hrl").
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
-include("emqttd_protocol.hrl").
|
-include("emqttd_protocol.hrl").
|
||||||
|
|
||||||
%% API Exports
|
%% API Exports
|
||||||
-export([start_link/1, ws_loop/3, session/1, info/1, kick/1]).
|
-export([start_link/4, session/1, info/1, kick/1]).
|
||||||
|
|
||||||
%% SUB/UNSUB Asynchronously
|
%% SUB/UNSUB Asynchronously
|
||||||
-export([subscribe/2, unsubscribe/2]).
|
-export([subscribe/2, unsubscribe/2]).
|
||||||
|
|
||||||
-behaviour(gen_server).
|
|
||||||
|
|
||||||
%% gen_server Function Exports
|
%% gen_server Function Exports
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||||
terminate/2, code_change/3]).
|
terminate/2, code_change/3]).
|
||||||
|
|
||||||
%% WebSocket Loop State
|
|
||||||
-record(wsocket_state, {request, client_pid, packet_opts, parser_fun}).
|
|
||||||
|
|
||||||
%% WebSocket Client State
|
%% WebSocket Client State
|
||||||
-record(wsclient_state, {ws_pid, request, proto_state, keepalive}).
|
-record(wsclient_state, {ws_pid, peer, connection, proto_state, keepalive}).
|
||||||
|
|
||||||
-define(WSLOG(Level, Format, Args, Req),
|
-define(WSLOG(Level, Peer, Format, Args),
|
||||||
lager:Level("WsClient(~s): " ++ Format, [Req:get(peer) | Args])).
|
lager:Level("WsClient(~s): " ++ Format, [Peer | Args])).
|
||||||
|
|
||||||
%% @doc Start WebSocket client.
|
%% @doc Start WebSocket Client.
|
||||||
start_link(Req) ->
|
start_link(MqttEnv, WsPid, Req, ReplyChannel) ->
|
||||||
PktOpts = emqttd:env(mqtt, packet),
|
gen_server:start_link(?MODULE, [MqttEnv, WsPid, Req, ReplyChannel], []).
|
||||||
ParserFun = emqttd_parser:new(PktOpts),
|
|
||||||
{ReentryWs, ReplyChannel} = upgrade(Req),
|
|
||||||
Params = [self(), Req, ReplyChannel, PktOpts],
|
|
||||||
{ok, ClientPid} = gen_server:start_link(?MODULE, Params, []),
|
|
||||||
ReentryWs(#wsocket_state{request = Req,
|
|
||||||
client_pid = ClientPid,
|
|
||||||
packet_opts = PktOpts,
|
|
||||||
parser_fun = ParserFun}).
|
|
||||||
|
|
||||||
session(CPid) ->
|
session(CPid) ->
|
||||||
gen_server:call(CPid, session, infinity).
|
gen_server:call(CPid, session, infinity).
|
||||||
|
@ -68,66 +57,40 @@ subscribe(CPid, TopicTable) ->
|
||||||
unsubscribe(CPid, Topics) ->
|
unsubscribe(CPid, Topics) ->
|
||||||
gen_server:cast(CPid, {unsubscribe, Topics}).
|
gen_server:cast(CPid, {unsubscribe, Topics}).
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Upgrade WebSocket.
|
|
||||||
upgrade(Req) ->
|
|
||||||
mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3).
|
|
||||||
|
|
||||||
%% @doc WebSocket frame receive loop.
|
|
||||||
ws_loop(<<>>, State, _ReplyChannel) ->
|
|
||||||
State;
|
|
||||||
ws_loop([<<>>], State, _ReplyChannel) ->
|
|
||||||
State;
|
|
||||||
ws_loop(Data, State = #wsocket_state{request = Req,
|
|
||||||
client_pid = ClientPid,
|
|
||||||
parser_fun = ParserFun}, ReplyChannel) ->
|
|
||||||
?WSLOG(debug, "RECV ~p", [Data], Req),
|
|
||||||
case catch ParserFun(iolist_to_binary(Data)) of
|
|
||||||
{more, NewParser} ->
|
|
||||||
State#wsocket_state{parser_fun = NewParser};
|
|
||||||
{ok, Packet, Rest} ->
|
|
||||||
gen_server:cast(ClientPid, {received, Packet}),
|
|
||||||
ws_loop(Rest, reset_parser(State), ReplyChannel);
|
|
||||||
{error, Error} ->
|
|
||||||
?WSLOG(error, "Frame error: ~p", [Error], Req),
|
|
||||||
exit({shutdown, Error});
|
|
||||||
{'EXIT', Reason} ->
|
|
||||||
?WSLOG(error, "Frame error: ~p", [Reason], Req),
|
|
||||||
?WSLOG(error, "Error data: ~p", [Data], Req),
|
|
||||||
exit({shutdown, parser_error})
|
|
||||||
end.
|
|
||||||
|
|
||||||
reset_parser(State = #wsocket_state{packet_opts = PktOpts}) ->
|
|
||||||
State#wsocket_state{parser_fun = emqttd_parser:new(PktOpts)}.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% gen_server callbacks
|
%% gen_server Callbacks
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
init([WsPid, Req, ReplyChannel, PktOpts]) ->
|
init([MqttEnv, WsPid, Req, ReplyChannel]) ->
|
||||||
%%issue#413: trap_exit is unnecessary
|
true = link(WsPid),
|
||||||
%%process_flag(trap_exit, true),
|
|
||||||
{ok, Peername} = Req:get(peername),
|
{ok, Peername} = Req:get(peername),
|
||||||
|
Headers = mochiweb_headers:to_list(
|
||||||
|
mochiweb_request:get(headers, Req)),
|
||||||
|
PktOpts = proplists:get_value(packet, MqttEnv),
|
||||||
SendFun = fun(Payload) -> ReplyChannel({binary, Payload}) end,
|
SendFun = fun(Payload) -> ReplyChannel({binary, Payload}) end,
|
||||||
Headers = mochiweb_request:get(headers, Req),
|
|
||||||
HeadersList = mochiweb_headers:to_list(Headers),
|
|
||||||
ProtoState = emqttd_protocol:init(Peername, SendFun,
|
ProtoState = emqttd_protocol:init(Peername, SendFun,
|
||||||
[{ws_initial_headers, HeadersList} | PktOpts]),
|
[{ws_initial_headers, Headers} | PktOpts]),
|
||||||
{ok, #wsclient_state{ws_pid = WsPid, request = Req, proto_state = ProtoState}}.
|
{ok, #wsclient_state{ws_pid = WsPid, peer = Req:get(peer),
|
||||||
|
connection = Req:get(connection),
|
||||||
|
proto_state = ProtoState}, idle_timeout(MqttEnv)}.
|
||||||
|
|
||||||
|
idle_timeout(MqttEnv) ->
|
||||||
|
ClientOpts = proplists:get_value(client, MqttEnv),
|
||||||
|
timer:seconds(proplists:get_value(idle_timeout, ClientOpts, 10)).
|
||||||
|
|
||||||
handle_call(session, _From, State = #wsclient_state{proto_state = ProtoState}) ->
|
handle_call(session, _From, State = #wsclient_state{proto_state = ProtoState}) ->
|
||||||
{reply, emqttd_protocol:session(ProtoState), State};
|
{reply, emqttd_protocol:session(ProtoState), State};
|
||||||
|
|
||||||
handle_call(info, _From, State = #wsclient_state{request = Req,
|
handle_call(info, _From, State = #wsclient_state{peer = Peer,
|
||||||
proto_state = ProtoState}) ->
|
proto_state = ProtoState}) ->
|
||||||
ProtoInfo = emqttd_protocol:info(ProtoState),
|
ProtoInfo = emqttd_protocol:info(ProtoState),
|
||||||
{reply, [{websocket, true}, {peer, Req:get(peer)}| ProtoInfo], State};
|
{reply, [{websocket, true}, {peer, Peer}| ProtoInfo], State};
|
||||||
|
|
||||||
handle_call(kick, _From, State) ->
|
handle_call(kick, _From, State) ->
|
||||||
{stop, {shutdown, kick}, ok, State};
|
{stop, {shutdown, kick}, ok, State};
|
||||||
|
|
||||||
handle_call(Req, _From, State = #wsclient_state{request = HttpReq}) ->
|
handle_call(Req, _From, State = #wsclient_state{peer = Peer}) ->
|
||||||
?WSLOG(critical, "Unexpected request: ~p", [Req], HttpReq),
|
?WSLOG(critical, Peer, "Unexpected request: ~p", [Req]),
|
||||||
{reply, {error, unsupported_request}, State}.
|
{reply, {error, unsupported_request}, State}.
|
||||||
|
|
||||||
handle_cast({subscribe, TopicTable}, State) ->
|
handle_cast({subscribe, TopicTable}, State) ->
|
||||||
|
@ -140,13 +103,12 @@ handle_cast({unsubscribe, Topics}, State) ->
|
||||||
emqttd_session:unsubscribe(SessPid, Topics)
|
emqttd_session:unsubscribe(SessPid, Topics)
|
||||||
end, State);
|
end, State);
|
||||||
|
|
||||||
handle_cast({received, Packet}, State = #wsclient_state{request = Req,
|
handle_cast({received, Packet}, State = #wsclient_state{peer = Peer, proto_state = ProtoState}) ->
|
||||||
proto_state = ProtoState}) ->
|
|
||||||
case emqttd_protocol:received(Packet, ProtoState) of
|
case emqttd_protocol:received(Packet, ProtoState) of
|
||||||
{ok, ProtoState1} ->
|
{ok, ProtoState1} ->
|
||||||
noreply(State#wsclient_state{proto_state = ProtoState1});
|
noreply(State#wsclient_state{proto_state = ProtoState1});
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
?WSLOG(error, "Protocol error - ~p", [Error], Req),
|
?WSLOG(error, Peer, "Protocol error - ~p", [Error]),
|
||||||
shutdown(Error, State);
|
shutdown(Error, State);
|
||||||
{error, Error, ProtoState1} ->
|
{error, Error, ProtoState1} ->
|
||||||
shutdown(Error, State#wsclient_state{proto_state = ProtoState1});
|
shutdown(Error, State#wsclient_state{proto_state = ProtoState1});
|
||||||
|
@ -154,9 +116,12 @@ handle_cast({received, Packet}, State = #wsclient_state{request = Req,
|
||||||
stop(Reason, State#wsclient_state{proto_state = ProtoState1})
|
stop(Reason, State#wsclient_state{proto_state = ProtoState1})
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_cast(Msg, State = #wsclient_state{request = Req}) ->
|
handle_cast(Msg, State = #wsclient_state{peer = Peer}) ->
|
||||||
?WSLOG(critical, "Unexpected msg: ~p", [Msg], Req),
|
?WSLOG(critical, Peer, "Unexpected msg: ~p", [Msg]),
|
||||||
{noreply, State}.
|
noreply(State).
|
||||||
|
|
||||||
|
handle_info(timeout, State) ->
|
||||||
|
shutdown(idle_timeout, State);
|
||||||
|
|
||||||
handle_info({suback, PacketId, GrantedQos}, State) ->
|
handle_info({suback, PacketId, GrantedQos}, State) ->
|
||||||
with_proto_state(fun(ProtoState) ->
|
with_proto_state(fun(ProtoState) ->
|
||||||
|
@ -174,13 +139,12 @@ handle_info({redeliver, {?PUBREL, PacketId}}, State) ->
|
||||||
emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState)
|
emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState)
|
||||||
end, State);
|
end, State);
|
||||||
|
|
||||||
handle_info({shutdown, conflict, {ClientId, NewPid}}, State = #wsclient_state{request = Req}) ->
|
handle_info({shutdown, conflict, {ClientId, NewPid}}, State = #wsclient_state{peer = Peer}) ->
|
||||||
?WSLOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid], Req),
|
?WSLOG(warning, Peer, "clientid '~s' conflict with ~p", [ClientId, NewPid]),
|
||||||
shutdown(conflict, State);
|
shutdown(conflict, State);
|
||||||
|
|
||||||
handle_info({keepalive, start, Interval}, State = #wsclient_state{request = Req}) ->
|
handle_info({keepalive, start, Interval}, State = #wsclient_state{peer = Peer, connection = Conn}) ->
|
||||||
?WSLOG(debug, "Keepalive at the interval of ~p", [Interval], Req),
|
?WSLOG(debug, Peer, "Keepalive at the interval of ~p", [Interval]),
|
||||||
Conn = Req:get(connection),
|
|
||||||
StatFun = fun() ->
|
StatFun = fun() ->
|
||||||
case Conn:getstat([recv_oct]) of
|
case Conn:getstat([recv_oct]) of
|
||||||
{ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct};
|
{ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct};
|
||||||
|
@ -190,25 +154,21 @@ handle_info({keepalive, start, Interval}, State = #wsclient_state{request = Req}
|
||||||
KeepAlive = emqttd_keepalive:start(StatFun, Interval, {keepalive, check}),
|
KeepAlive = emqttd_keepalive:start(StatFun, Interval, {keepalive, check}),
|
||||||
noreply(State#wsclient_state{keepalive = KeepAlive});
|
noreply(State#wsclient_state{keepalive = KeepAlive});
|
||||||
|
|
||||||
handle_info({keepalive, check}, State = #wsclient_state{request = Req,
|
handle_info({keepalive, check}, State = #wsclient_state{peer = Peer,
|
||||||
keepalive = KeepAlive}) ->
|
keepalive = KeepAlive}) ->
|
||||||
case emqttd_keepalive:check(KeepAlive) of
|
case emqttd_keepalive:check(KeepAlive) of
|
||||||
{ok, KeepAlive1} ->
|
{ok, KeepAlive1} ->
|
||||||
noreply(State#wsclient_state{keepalive = KeepAlive1});
|
noreply(State#wsclient_state{keepalive = KeepAlive1});
|
||||||
{error, timeout} ->
|
{error, timeout} ->
|
||||||
?WSLOG(debug, "Keepalive Timeout!", [], Req),
|
?WSLOG(debug, Peer, "Keepalive Timeout!", []),
|
||||||
shutdown(keepalive_timeout, State);
|
shutdown(keepalive_timeout, State);
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
?WSLOG(warning, "Keepalive error - ~p", [Error], Req),
|
?WSLOG(warning, Peer, "Keepalive error - ~p", [Error]),
|
||||||
shutdown(keepalive_error, State)
|
shutdown(keepalive_error, State)
|
||||||
end;
|
end;
|
||||||
|
|
||||||
%%issue#413: removed the trap_exit flag
|
handle_info(Info, State = #wsclient_state{peer = Peer}) ->
|
||||||
%%handle_info({'EXIT', WsPid, Reason}, State = #wsclient_state{ws_pid = WsPid}) ->
|
?WSLOG(critical, Peer, "Unexpected Info: ~p", [Info]),
|
||||||
%% stop(Reason, State);
|
|
||||||
|
|
||||||
handle_info(Info, State = #wsclient_state{request = Req}) ->
|
|
||||||
?WSLOG(critical, "Unexpected Info: ~p", [Info], Req),
|
|
||||||
noreply(State).
|
noreply(State).
|
||||||
|
|
||||||
terminate(Reason, #wsclient_state{proto_state = ProtoState, keepalive = KeepAlive}) ->
|
terminate(Reason, #wsclient_state{proto_state = ProtoState, keepalive = KeepAlive}) ->
|
||||||
|
|
Loading…
Reference in New Issue