Use process dictionaries to record last packet timestamp

This commit is contained in:
周子博 2018-09-21 18:50:26 +08:00
parent b1d4ec750a
commit aade94711c
2 changed files with 16 additions and 13 deletions

View File

@ -45,8 +45,7 @@
rate_limit, rate_limit,
publish_limit, publish_limit,
limit_timer, limit_timer,
idle_timeout, idle_timeout
last_packet_ts = 0
}). }).
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
@ -253,8 +252,8 @@ handle_info({keepalive, start, Interval}, State) ->
shutdown(Error, State) shutdown(Error, State)
end; end;
handle_info({keepalive, check}, State = #state{keepalive = KeepAlive, last_packet_ts = LastPacketTs}) -> handle_info({keepalive, check}, State = #state{keepalive = KeepAlive}) ->
case emqx_keepalive:check(KeepAlive, LastPacketTs) of case emqx_keepalive:check(KeepAlive, get(last_packet_ts)) of
{ok, KeepAlive1} -> {ok, KeepAlive1} ->
{noreply, State#state{keepalive = KeepAlive1}}; {noreply, State#state{keepalive = KeepAlive1}};
{error, timeout} -> {error, timeout} ->
@ -291,14 +290,16 @@ code_change(_OldVsn, State, _Extra) ->
%% Receive and parse data %% Receive and parse data
handle_packet(<<>>, State) -> handle_packet(<<>>, State) ->
{noreply, maybe_gc(ensure_stats_timer(ensure_rate_limit(State#state{last_packet_ts = erlang:system_time(millisecond)})))}; put(last_packet_ts, erlang:system_time(millisecond)),
{noreply, maybe_gc(ensure_stats_timer(ensure_rate_limit(State)))};
handle_packet(Data, State = #state{proto_state = ProtoState, handle_packet(Data, State = #state{proto_state = ProtoState,
parser_state = ParserState, parser_state = ParserState,
idle_timeout = IdleTimeout}) -> idle_timeout = IdleTimeout}) ->
case catch emqx_frame:parse(Data, ParserState) of case catch emqx_frame:parse(Data, ParserState) of
{more, NewParserState} -> {more, NewParserState} ->
{noreply, run_socket(State#state{parser_state = NewParserState, last_packet_ts = erlang:system_time(millisecond)}), IdleTimeout}; put(last_packet_ts, erlang:system_time(millisecond)),
{noreply, run_socket(State#state{parser_state = NewParserState}), IdleTimeout};
{ok, Packet = ?PACKET(Type), Rest} -> {ok, Packet = ?PACKET(Type), Rest} ->
emqx_metrics:received(Packet), emqx_metrics:received(Packet),
case emqx_protocol:received(Packet, ProtoState) of case emqx_protocol:received(Packet, ProtoState) of

View File

@ -40,8 +40,7 @@
keepalive, keepalive,
enable_stats, enable_stats,
stats_timer, stats_timer,
shutdown, shutdown
last_packet_ts = 0
}). }).
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]).
@ -154,9 +153,11 @@ send_fun(WsPid) ->
end. end.
websocket_handle({binary, <<>>}, State) -> websocket_handle({binary, <<>>}, State) ->
{ok, ensure_stats_timer(State#state{last_packet_ts = erlang:system_time(millisecond)})}; put(last_packet_ts, erlang:system_time(millisecond)),
{ok, ensure_stats_timer(State)};
websocket_handle({binary, [<<>>]}, State) -> websocket_handle({binary, [<<>>]}, State) ->
{ok, ensure_stats_timer(State#state{last_packet_ts = erlang:system_time(millisecond)})}; put(last_packet_ts, erlang:system_time(millisecond)),
{ok, ensure_stats_timer(State)};
websocket_handle({binary, Data}, State = #state{parser_state = ParserState, websocket_handle({binary, Data}, State = #state{parser_state = ParserState,
proto_state = ProtoState}) -> proto_state = ProtoState}) ->
BinSize = iolist_size(Data), BinSize = iolist_size(Data),
@ -165,7 +166,8 @@ websocket_handle({binary, Data}, State = #state{parser_state = ParserState,
emqx_metrics:inc('bytes/received', BinSize), emqx_metrics:inc('bytes/received', BinSize),
case catch emqx_frame:parse(iolist_to_binary(Data), ParserState) of case catch emqx_frame:parse(iolist_to_binary(Data), ParserState) of
{more, NewParserState} -> {more, NewParserState} ->
{ok, State#state{parser_state = NewParserState, last_packet_ts = erlang:system_time(millisecond)}}; put(last_packet_ts, erlang:system_time(millisecond)),
{ok, State#state{parser_state = NewParserState}};
{ok, Packet, Rest} -> {ok, Packet, Rest} ->
emqx_metrics:received(Packet), emqx_metrics:received(Packet),
put(recv_cnt, get(recv_cnt) + 1), put(recv_cnt, get(recv_cnt) + 1),
@ -231,8 +233,8 @@ websocket_info({keepalive, start, Interval}, State) ->
shutdown(Error, State) shutdown(Error, State)
end; end;
websocket_info({keepalive, check}, State = #state{keepalive = KeepAlive, last_packet_ts = LastPacketTs}) -> websocket_info({keepalive, check}, State = #state{keepalive = KeepAlive}) ->
case emqx_keepalive:check(KeepAlive, LastPacketTs) of case emqx_keepalive:check(KeepAlive, get(last_packet_ts)) of
{ok, KeepAlive1} -> {ok, KeepAlive1} ->
{ok, State#state{keepalive = KeepAlive1}}; {ok, State#state{keepalive = KeepAlive1}};
{error, timeout} -> {error, timeout} ->