From aade94711c52f95648c0d615f61b7a523b5409bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Fri, 21 Sep 2018 18:50:26 +0800 Subject: [PATCH] Use process dictionaries to record last packet timestamp --- src/emqx_connection.erl | 13 +++++++------ src/emqx_ws_connection.erl | 16 +++++++++------- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 14412ab95..6c1ab3e66 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -45,8 +45,7 @@ rate_limit, publish_limit, limit_timer, - idle_timeout, - last_packet_ts = 0 + idle_timeout }). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). @@ -253,8 +252,8 @@ handle_info({keepalive, start, Interval}, State) -> shutdown(Error, State) end; -handle_info({keepalive, check}, State = #state{keepalive = KeepAlive, last_packet_ts = LastPacketTs}) -> - case emqx_keepalive:check(KeepAlive, LastPacketTs) of +handle_info({keepalive, check}, State = #state{keepalive = KeepAlive}) -> + case emqx_keepalive:check(KeepAlive, get(last_packet_ts)) of {ok, KeepAlive1} -> {noreply, State#state{keepalive = KeepAlive1}}; {error, timeout} -> @@ -291,14 +290,16 @@ code_change(_OldVsn, State, _Extra) -> %% Receive and parse data handle_packet(<<>>, State) -> - {noreply, maybe_gc(ensure_stats_timer(ensure_rate_limit(State#state{last_packet_ts = erlang:system_time(millisecond)})))}; + put(last_packet_ts, erlang:system_time(millisecond)), + {noreply, maybe_gc(ensure_stats_timer(ensure_rate_limit(State)))}; handle_packet(Data, State = #state{proto_state = ProtoState, parser_state = ParserState, idle_timeout = IdleTimeout}) -> case catch emqx_frame:parse(Data, ParserState) of {more, NewParserState} -> - {noreply, run_socket(State#state{parser_state = NewParserState, last_packet_ts = erlang:system_time(millisecond)}), IdleTimeout}; + put(last_packet_ts, erlang:system_time(millisecond)), + {noreply, run_socket(State#state{parser_state = NewParserState}), IdleTimeout}; {ok, Packet = ?PACKET(Type), Rest} -> emqx_metrics:received(Packet), case emqx_protocol:received(Packet, ProtoState) of diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 3a2d4fdae..2526392ee 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -40,8 +40,7 @@ keepalive, enable_stats, stats_timer, - shutdown, - last_packet_ts = 0 + shutdown }). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]). @@ -154,9 +153,11 @@ send_fun(WsPid) -> end. websocket_handle({binary, <<>>}, State) -> - {ok, ensure_stats_timer(State#state{last_packet_ts = erlang:system_time(millisecond)})}; + put(last_packet_ts, erlang:system_time(millisecond)), + {ok, ensure_stats_timer(State)}; websocket_handle({binary, [<<>>]}, State) -> - {ok, ensure_stats_timer(State#state{last_packet_ts = erlang:system_time(millisecond)})}; + put(last_packet_ts, erlang:system_time(millisecond)), + {ok, ensure_stats_timer(State)}; websocket_handle({binary, Data}, State = #state{parser_state = ParserState, proto_state = ProtoState}) -> BinSize = iolist_size(Data), @@ -165,7 +166,8 @@ websocket_handle({binary, Data}, State = #state{parser_state = ParserState, emqx_metrics:inc('bytes/received', BinSize), case catch emqx_frame:parse(iolist_to_binary(Data), ParserState) of {more, NewParserState} -> - {ok, State#state{parser_state = NewParserState, last_packet_ts = erlang:system_time(millisecond)}}; + put(last_packet_ts, erlang:system_time(millisecond)), + {ok, State#state{parser_state = NewParserState}}; {ok, Packet, Rest} -> emqx_metrics:received(Packet), put(recv_cnt, get(recv_cnt) + 1), @@ -231,8 +233,8 @@ websocket_info({keepalive, start, Interval}, State) -> shutdown(Error, State) end; -websocket_info({keepalive, check}, State = #state{keepalive = KeepAlive, last_packet_ts = LastPacketTs}) -> - case emqx_keepalive:check(KeepAlive, LastPacketTs) of +websocket_info({keepalive, check}, State = #state{keepalive = KeepAlive}) -> + case emqx_keepalive:check(KeepAlive, get(last_packet_ts)) of {ok, KeepAlive1} -> {ok, State#state{keepalive = KeepAlive1}}; {error, timeout} ->