diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 2ab7f6c1e..e59fd2223 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -239,6 +239,7 @@ run_loop(Parent, State = #state{transport = Transport, exit_on_sock_error(Reason) end. +-spec exit_on_sock_error(any()) -> no_return(). exit_on_sock_error(Reason) when Reason =:= einval; Reason =:= enotconn; Reason =:= closed -> @@ -330,7 +331,7 @@ handle_msg({'$gen_call', From, Req}, State) -> handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl -> ?LOG(debug, "RECV ~0p", [Data]), Oct = iolist_size(Data), - emqx_pd:inc_counter(incoming_bytes, Oct), + inc_counter(incoming_bytes, Oct), ok = emqx_metrics:inc('bytes.received', Oct), parse_incoming(Data, State); @@ -426,10 +427,11 @@ handle_msg(Msg, State) -> %%-------------------------------------------------------------------- %% Terminate +-spec terminate(any(), state()) -> no_return(). terminate(Reason, State = #state{channel = Channel}) -> ?LOG(debug, "Terminated due to ~p", [Reason]), emqx_channel:terminate(Reason, Channel), - close_socket(State), + _ = close_socket(State), exit(Reason). %%-------------------------------------------------------------------- @@ -597,7 +599,7 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> send(IoData, #state{transport = Transport, socket = Socket}) -> Oct = iolist_size(IoData), ok = emqx_metrics:inc('bytes.sent', Oct), - emqx_pd:inc_counter(outgoing_bytes, Oct), + inc_counter(outgoing_bytes, Oct), case Transport:async_send(Socket, IoData) of ok -> ok; Error = {error, _Reason} -> @@ -693,23 +695,25 @@ close_socket(State = #state{transport = Transport, socket = Socket}) -> -compile({inline, [inc_incoming_stats/1]}). inc_incoming_stats(Packet = ?PACKET(Type)) -> - emqx_pd:inc_counter(recv_pkt, 1), - if - Type == ?PUBLISH -> - emqx_pd:inc_counter(recv_msg, 1), - emqx_pd:inc_counter(incoming_pubs, 1); - true -> ok + inc_counter(recv_pkt, 1), + case Type =:= ?PUBLISH of + true -> + inc_counter(recv_msg, 1), + inc_counter(incoming_pubs, 1); + false -> + ok end, emqx_metrics:inc_recv(Packet). -compile({inline, [inc_outgoing_stats/1]}). inc_outgoing_stats(Packet = ?PACKET(Type)) -> - emqx_pd:inc_counter(send_pkt, 1), - if - Type == ?PUBLISH -> - emqx_pd:inc_counter(send_msg, 1), - emqx_pd:inc_counter(outgoing_pubs, 1); - true -> ok + inc_counter(send_pkt, 1), + case Type =:= ?PUBLISH of + true -> + inc_counter(send_msg, 1), + inc_counter(outgoing_pubs, 1); + false -> + ok end, emqx_metrics:inc_sent(Packet). @@ -738,6 +742,10 @@ stop(Reason, State) -> stop(Reason, Reply, State) -> {stop, Reason, Reply, State}. +inc_counter(Key, Inc) -> + _ = emqx_pd:inc_counter(Key, Inc), + ok. + %%-------------------------------------------------------------------- %% For CT tests %%--------------------------------------------------------------------