From f194f924180ea43c0acfbd74d13e05e91fe40ce7 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 23 Mar 2017 16:30:23 +0800 Subject: [PATCH] Shutdown the connection if no more data received --- src/emqttd_client.erl | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index be08b6fed..98db870e7 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -55,7 +55,7 @@ %% Unused fields: connname, peerhost, peerport -record(client_state, {connection, peername, conn_state, await_recv, rate_limit, packet_size, parser, proto_state, - keepalive, enable_stats, force_gc_count}). + keepalive, enable_stats, idle_timeout, force_gc_count}). -define(INFO_KEYS, [peername, conn_state, await_recv]). @@ -114,6 +114,7 @@ do_init(Conn, Env, Peername) -> Parser = emqttd_parser:initial_state(PacketSize), ProtoState = emqttd_protocol:init(Conn, Peername, SendFun, Env), EnableStats = get_value(client_enable_stats, Env, false), + IdleTimout = get_value(client_idle_timeout, Env, 30000), ForceGcCount = emqttd_gc:conn_max_gc_count(), State = run_socket(#client_state{connection = Conn, peername = Peername, @@ -124,8 +125,8 @@ do_init(Conn, Env, Peername) -> parser = Parser, proto_state = ProtoState, enable_stats = EnableStats, + idle_timeout = IdleTimout, force_gc_count = ForceGcCount}), - IdleTimout = get_value(client_idle_timeout, Env, 30000), gen_server2:enter_loop(?MODULE, [], State, self(), IdleTimout, {backoff, 2000, 2000, 20000}). @@ -275,9 +276,11 @@ handle_info({keepalive, check}, State = #client_state{keepalive = KeepAlive}) -> handle_info(Info, State) -> ?UNEXPECTED_INFO(Info, State). -terminate(Reason, #client_state{connection = Conn, - keepalive = KeepAlive, - proto_state = ProtoState}) -> +terminate(Reason, State = #client_state{connection = Conn, + keepalive = KeepAlive, + proto_state = ProtoState}) -> + + ?LOG(debug, "Terminated for ~p", [Reason], State), Conn:fast_close(), emqttd_keepalive:cancel(KeepAlive), case {ProtoState, Reason} of @@ -300,12 +303,13 @@ code_change(_OldVsn, State, _Extra) -> received(<<>>, State) -> {noreply, gc(State), hibernate}; -received(Bytes, State = #client_state{parser = Parser, - packet_size = PacketSize, - proto_state = ProtoState}) -> +received(Bytes, State = #client_state{parser = Parser, + packet_size = PacketSize, + proto_state = ProtoState, + idle_timeout = IdleTimeout}) -> case catch emqttd_parser:parse(Bytes, Parser) of {more, NewParser} -> - {noreply, run_socket(State#client_state{parser = NewParser}), hibernate}; + {noreply, run_socket(State#client_state{parser = NewParser}), IdleTimeout}; {ok, Packet, Rest} -> emqttd_metrics:received(Packet), case emqttd_protocol:received(Packet, ProtoState) of