diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 180edc8f0..0331be936 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -134,7 +134,6 @@ websocket_init(#state{request = Req, options = Options}) -> Zone = proplists:get_value(zone, Options), EnableStats = emqx_zone:get_env(Zone, enable_stats, true), IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000), - lists:foreach(fun(Stat) -> put(Stat, 0) end, ?SOCK_STATS), emqx_logger:set_metadata_peername(esockd_net:format(Peername)), {ok, #state{peername = Peername, @@ -149,14 +148,14 @@ send_fun(WsPid) -> Data = emqx_frame:serialize(Packet, Options), BinSize = iolist_size(Data), emqx_metrics:trans(inc, 'bytes/sent', BinSize), - put(send_oct, get(send_oct) + BinSize), - put(send_cnt, get(send_cnt) + 1), + emqx_pd:update_counter(send_cnt, 1), + emqx_pd:update_counter(send_oct, BinSize), WsPid ! {binary, iolist_to_binary(Data)}, ok end. stat_fun() -> - fun() -> {ok, get(recv_oct)} end. + fun() -> {ok, emqx_pd:get_counter(recv_oct)} end. websocket_handle({binary, <<>>}, State) -> {ok, ensure_stats_timer(State)}; @@ -164,16 +163,16 @@ websocket_handle({binary, [<<>>]}, State) -> {ok, ensure_stats_timer(State)}; websocket_handle({binary, Data}, State = #state{parser_state = ParserState, proto_state = ProtoState}) -> - BinSize = iolist_size(Data), - put(recv_oct, get(recv_oct) + BinSize), ?LOG(debug, "RECV ~p", [Data]), + BinSize = iolist_size(Data), + emqx_pd:update_counter(recv_oct, BinSize), emqx_metrics:trans(inc, 'bytes/received', BinSize), - case catch emqx_frame:parse(iolist_to_binary(Data), ParserState) of - {more, NewParserState} -> - {ok, State#state{parser_state = NewParserState}}; + try emqx_frame:parse(iolist_to_binary(Data), ParserState) of + {more, ParserState1} -> + {ok, State#state{parser_state = ParserState1}}; {ok, Packet, Rest} -> emqx_metrics:received(Packet), - put(recv_cnt, get(recv_cnt) + 1), + emqx_pd:update_counter(recv_cnt, 1), case emqx_protocol:received(Packet, ProtoState) of {ok, ProtoState1} -> websocket_handle({binary, Rest}, reset_parser(State#state{proto_state = ProtoState1})); @@ -187,9 +186,10 @@ websocket_handle({binary, Data}, State = #state{parser_state = ParserState, end; {error, Error} -> ?LOG(error, "Frame error: ~p", [Error]), - stop(Error, State); - {'EXIT', Reason} -> - ?LOG(error, "Frame error:~p~nFrame data: ~p", [Reason, Data]), + stop(Error, State) + catch + _:Error -> + ?LOG(error, "Frame error:~p~nFrame data: ~p", [Error, Data]), shutdown(parse_error, State) end. @@ -303,4 +303,5 @@ stop(Error, State) -> {stop, State#state{shutdown = Error}}. wsock_stats() -> - [{Key, get(Key)} || Key <- ?SOCK_STATS]. + [{Key, emqx_pd:get_counter(Key)} || Key <- ?SOCK_STATS]. +