Shutdown the connection if no more data received
This commit is contained in:
parent
3c8de09ba3
commit
f194f92418
|
@ -55,7 +55,7 @@
|
||||||
%% Unused fields: connname, peerhost, peerport
|
%% Unused fields: connname, peerhost, peerport
|
||||||
-record(client_state, {connection, peername, conn_state, await_recv,
|
-record(client_state, {connection, peername, conn_state, await_recv,
|
||||||
rate_limit, packet_size, parser, proto_state,
|
rate_limit, packet_size, parser, proto_state,
|
||||||
keepalive, enable_stats, force_gc_count}).
|
keepalive, enable_stats, idle_timeout, force_gc_count}).
|
||||||
|
|
||||||
-define(INFO_KEYS, [peername, conn_state, await_recv]).
|
-define(INFO_KEYS, [peername, conn_state, await_recv]).
|
||||||
|
|
||||||
|
@ -114,6 +114,7 @@ do_init(Conn, Env, Peername) ->
|
||||||
Parser = emqttd_parser:initial_state(PacketSize),
|
Parser = emqttd_parser:initial_state(PacketSize),
|
||||||
ProtoState = emqttd_protocol:init(Conn, Peername, SendFun, Env),
|
ProtoState = emqttd_protocol:init(Conn, Peername, SendFun, Env),
|
||||||
EnableStats = get_value(client_enable_stats, Env, false),
|
EnableStats = get_value(client_enable_stats, Env, false),
|
||||||
|
IdleTimout = get_value(client_idle_timeout, Env, 30000),
|
||||||
ForceGcCount = emqttd_gc:conn_max_gc_count(),
|
ForceGcCount = emqttd_gc:conn_max_gc_count(),
|
||||||
State = run_socket(#client_state{connection = Conn,
|
State = run_socket(#client_state{connection = Conn,
|
||||||
peername = Peername,
|
peername = Peername,
|
||||||
|
@ -124,8 +125,8 @@ do_init(Conn, Env, Peername) ->
|
||||||
parser = Parser,
|
parser = Parser,
|
||||||
proto_state = ProtoState,
|
proto_state = ProtoState,
|
||||||
enable_stats = EnableStats,
|
enable_stats = EnableStats,
|
||||||
|
idle_timeout = IdleTimout,
|
||||||
force_gc_count = ForceGcCount}),
|
force_gc_count = ForceGcCount}),
|
||||||
IdleTimout = get_value(client_idle_timeout, Env, 30000),
|
|
||||||
gen_server2:enter_loop(?MODULE, [], State, self(), IdleTimout,
|
gen_server2:enter_loop(?MODULE, [], State, self(), IdleTimout,
|
||||||
{backoff, 2000, 2000, 20000}).
|
{backoff, 2000, 2000, 20000}).
|
||||||
|
|
||||||
|
@ -275,9 +276,11 @@ handle_info({keepalive, check}, State = #client_state{keepalive = KeepAlive}) ->
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
?UNEXPECTED_INFO(Info, State).
|
?UNEXPECTED_INFO(Info, State).
|
||||||
|
|
||||||
terminate(Reason, #client_state{connection = Conn,
|
terminate(Reason, State = #client_state{connection = Conn,
|
||||||
keepalive = KeepAlive,
|
keepalive = KeepAlive,
|
||||||
proto_state = ProtoState}) ->
|
proto_state = ProtoState}) ->
|
||||||
|
|
||||||
|
?LOG(debug, "Terminated for ~p", [Reason], State),
|
||||||
Conn:fast_close(),
|
Conn:fast_close(),
|
||||||
emqttd_keepalive:cancel(KeepAlive),
|
emqttd_keepalive:cancel(KeepAlive),
|
||||||
case {ProtoState, Reason} of
|
case {ProtoState, Reason} of
|
||||||
|
@ -300,12 +303,13 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
received(<<>>, State) ->
|
received(<<>>, State) ->
|
||||||
{noreply, gc(State), hibernate};
|
{noreply, gc(State), hibernate};
|
||||||
|
|
||||||
received(Bytes, State = #client_state{parser = Parser,
|
received(Bytes, State = #client_state{parser = Parser,
|
||||||
packet_size = PacketSize,
|
packet_size = PacketSize,
|
||||||
proto_state = ProtoState}) ->
|
proto_state = ProtoState,
|
||||||
|
idle_timeout = IdleTimeout}) ->
|
||||||
case catch emqttd_parser:parse(Bytes, Parser) of
|
case catch emqttd_parser:parse(Bytes, Parser) of
|
||||||
{more, NewParser} ->
|
{more, NewParser} ->
|
||||||
{noreply, run_socket(State#client_state{parser = NewParser}), hibernate};
|
{noreply, run_socket(State#client_state{parser = NewParser}), IdleTimeout};
|
||||||
{ok, Packet, Rest} ->
|
{ok, Packet, Rest} ->
|
||||||
emqttd_metrics:received(Packet),
|
emqttd_metrics:received(Packet),
|
||||||
case emqttd_protocol:received(Packet, ProtoState) of
|
case emqttd_protocol:received(Packet, ProtoState) of
|
||||||
|
|
Loading…
Reference in New Issue