Replace put/2 with emqx_pd:update_counter/2 (#2098)

This commit is contained in:
Feng Lee 2018-12-21 17:47:49 +08:00 committed by turtleDeng
parent e949e8cbd8
commit f31e7f8bde
1 changed files with 15 additions and 14 deletions

View File

@ -134,7 +134,6 @@ websocket_init(#state{request = Req, options = Options}) ->
Zone = proplists:get_value(zone, Options), Zone = proplists:get_value(zone, Options),
EnableStats = emqx_zone:get_env(Zone, enable_stats, true), EnableStats = emqx_zone:get_env(Zone, enable_stats, true),
IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000), IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000),
lists:foreach(fun(Stat) -> put(Stat, 0) end, ?SOCK_STATS),
emqx_logger:set_metadata_peername(esockd_net:format(Peername)), emqx_logger:set_metadata_peername(esockd_net:format(Peername)),
{ok, #state{peername = Peername, {ok, #state{peername = Peername,
@ -149,14 +148,14 @@ send_fun(WsPid) ->
Data = emqx_frame:serialize(Packet, Options), Data = emqx_frame:serialize(Packet, Options),
BinSize = iolist_size(Data), BinSize = iolist_size(Data),
emqx_metrics:trans(inc, 'bytes/sent', BinSize), emqx_metrics:trans(inc, 'bytes/sent', BinSize),
put(send_oct, get(send_oct) + BinSize), emqx_pd:update_counter(send_cnt, 1),
put(send_cnt, get(send_cnt) + 1), emqx_pd:update_counter(send_oct, BinSize),
WsPid ! {binary, iolist_to_binary(Data)}, WsPid ! {binary, iolist_to_binary(Data)},
ok ok
end. end.
stat_fun() -> stat_fun() ->
fun() -> {ok, get(recv_oct)} end. fun() -> {ok, emqx_pd:get_counter(recv_oct)} end.
websocket_handle({binary, <<>>}, State) -> websocket_handle({binary, <<>>}, State) ->
{ok, ensure_stats_timer(State)}; {ok, ensure_stats_timer(State)};
@ -164,16 +163,16 @@ websocket_handle({binary, [<<>>]}, State) ->
{ok, ensure_stats_timer(State)}; {ok, ensure_stats_timer(State)};
websocket_handle({binary, Data}, State = #state{parser_state = ParserState, websocket_handle({binary, Data}, State = #state{parser_state = ParserState,
proto_state = ProtoState}) -> proto_state = ProtoState}) ->
BinSize = iolist_size(Data),
put(recv_oct, get(recv_oct) + BinSize),
?LOG(debug, "RECV ~p", [Data]), ?LOG(debug, "RECV ~p", [Data]),
BinSize = iolist_size(Data),
emqx_pd:update_counter(recv_oct, BinSize),
emqx_metrics:trans(inc, 'bytes/received', BinSize), emqx_metrics:trans(inc, 'bytes/received', BinSize),
case catch emqx_frame:parse(iolist_to_binary(Data), ParserState) of try emqx_frame:parse(iolist_to_binary(Data), ParserState) of
{more, NewParserState} -> {more, ParserState1} ->
{ok, State#state{parser_state = NewParserState}}; {ok, State#state{parser_state = ParserState1}};
{ok, Packet, Rest} -> {ok, Packet, Rest} ->
emqx_metrics:received(Packet), emqx_metrics:received(Packet),
put(recv_cnt, get(recv_cnt) + 1), emqx_pd:update_counter(recv_cnt, 1),
case emqx_protocol:received(Packet, ProtoState) of case emqx_protocol:received(Packet, ProtoState) of
{ok, ProtoState1} -> {ok, ProtoState1} ->
websocket_handle({binary, Rest}, reset_parser(State#state{proto_state = ProtoState1})); websocket_handle({binary, Rest}, reset_parser(State#state{proto_state = ProtoState1}));
@ -187,9 +186,10 @@ websocket_handle({binary, Data}, State = #state{parser_state = ParserState,
end; end;
{error, Error} -> {error, Error} ->
?LOG(error, "Frame error: ~p", [Error]), ?LOG(error, "Frame error: ~p", [Error]),
stop(Error, State); stop(Error, State)
{'EXIT', Reason} -> catch
?LOG(error, "Frame error:~p~nFrame data: ~p", [Reason, Data]), _:Error ->
?LOG(error, "Frame error:~p~nFrame data: ~p", [Error, Data]),
shutdown(parse_error, State) shutdown(parse_error, State)
end. end.
@ -303,4 +303,5 @@ stop(Error, State) ->
{stop, State#state{shutdown = Error}}. {stop, State#state{shutdown = Error}}.
wsock_stats() -> wsock_stats() ->
[{Key, get(Key)} || Key <- ?SOCK_STATS]. [{Key, emqx_pd:get_counter(Key)} || Key <- ?SOCK_STATS].