Some forgotten changes in last commit
This commit is contained in:
parent
ea62b15c87
commit
c87aabbbeb
|
@ -137,10 +137,6 @@ websocket_init(#state{request = Req, options = Options}) ->
|
||||||
lists:foreach(fun(Stat) -> put(Stat, 0) end, ?SOCK_STATS),
|
lists:foreach(fun(Stat) -> put(Stat, 0) end, ?SOCK_STATS),
|
||||||
|
|
||||||
emqx_logger:set_metadata_peername(esockd_net:format(Peername)),
|
emqx_logger:set_metadata_peername(esockd_net:format(Peername)),
|
||||||
MetricCommitInterval = emqx_config:get_env(metric_commit_interval, 10000),
|
|
||||||
emqx_metrics:start_timer(MetricCommitInterval,
|
|
||||||
MetricCommitInterval div 2,
|
|
||||||
{metric_commit, MetricCommitInterval}),
|
|
||||||
{ok, #state{peername = Peername,
|
{ok, #state{peername = Peername,
|
||||||
sockname = Sockname,
|
sockname = Sockname,
|
||||||
parser_state = ParserState,
|
parser_state = ParserState,
|
||||||
|
@ -152,7 +148,7 @@ send_fun(WsPid) ->
|
||||||
fun(Packet, Options) ->
|
fun(Packet, Options) ->
|
||||||
Data = emqx_frame:serialize(Packet, Options),
|
Data = emqx_frame:serialize(Packet, Options),
|
||||||
BinSize = iolist_size(Data),
|
BinSize = iolist_size(Data),
|
||||||
emqx_metrics:inc('bytes/sent', BinSize),
|
emqx_metrics:trans(inc, 'bytes/sent', BinSize),
|
||||||
put(send_oct, get(send_oct) + BinSize),
|
put(send_oct, get(send_oct) + BinSize),
|
||||||
put(send_cnt, get(send_cnt) + 1),
|
put(send_cnt, get(send_cnt) + 1),
|
||||||
WsPid ! {binary, iolist_to_binary(Data)},
|
WsPid ! {binary, iolist_to_binary(Data)},
|
||||||
|
@ -171,7 +167,7 @@ websocket_handle({binary, Data}, State = #state{parser_state = ParserState,
|
||||||
BinSize = iolist_size(Data),
|
BinSize = iolist_size(Data),
|
||||||
put(recv_oct, get(recv_oct) + BinSize),
|
put(recv_oct, get(recv_oct) + BinSize),
|
||||||
?LOG(debug, "RECV ~p", [Data]),
|
?LOG(debug, "RECV ~p", [Data]),
|
||||||
emqx_metrics:inc('bytes/received', BinSize),
|
emqx_metrics:trans(inc, 'bytes/received', BinSize),
|
||||||
case catch emqx_frame:parse(iolist_to_binary(Data), ParserState) of
|
case catch emqx_frame:parse(iolist_to_binary(Data), ParserState) of
|
||||||
{more, NewParserState} ->
|
{more, NewParserState} ->
|
||||||
{ok, State#state{parser_state = NewParserState}};
|
{ok, State#state{parser_state = NewParserState}};
|
||||||
|
@ -227,14 +223,10 @@ websocket_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) ->
|
||||||
|
|
||||||
websocket_info({timeout, Timer, emit_stats},
|
websocket_info({timeout, Timer, emit_stats},
|
||||||
State = #state{stats_timer = Timer, proto_state = ProtoState}) ->
|
State = #state{stats_timer = Timer, proto_state = ProtoState}) ->
|
||||||
|
emqx_metrics:commit(),
|
||||||
emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)),
|
emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)),
|
||||||
{ok, State#state{stats_timer = undefined}, hibernate};
|
{ok, State#state{stats_timer = undefined}, hibernate};
|
||||||
|
|
||||||
websocket_info({timeout, _Timer, {metric_commit, MetricCommitInterval}}, State) ->
|
|
||||||
emqx_metrics:commit(),
|
|
||||||
emqx_metrics:start_timer(MetricCommitInterval, {metric_commit, MetricCommitInterval}),
|
|
||||||
{ok, State, hibernate};
|
|
||||||
|
|
||||||
websocket_info({keepalive, start, Interval}, State) ->
|
websocket_info({keepalive, start, Interval}, State) ->
|
||||||
?LOG(debug, "Keepalive at the interval of ~p", [Interval]),
|
?LOG(debug, "Keepalive at the interval of ~p", [Interval]),
|
||||||
case emqx_keepalive:start(stat_fun(), Interval, {keepalive, check}) of
|
case emqx_keepalive:start(stat_fun(), Interval, {keepalive, check}) of
|
||||||
|
|
Loading…
Reference in New Issue