diff --git a/src/emqx_sys.erl b/src/emqx_sys.erl index c77c23c49..9b256df0b 100644 --- a/src/emqx_sys.erl +++ b/src/emqx_sys.erl @@ -151,16 +151,16 @@ handle_cast(Msg, State) -> {noreply, State}. handle_info({timeout, TRef, heartbeat}, State = #state{heartbeat = TRef}) -> - _ = publish(uptime, iolist_to_binary(uptime(State))), - _ = publish(datetime, iolist_to_binary(datetime())), + publish_any(uptime, iolist_to_binary(uptime(State))), + publish_any(datetime, iolist_to_binary(datetime())), {noreply, heartbeat(State)}; handle_info({timeout, TRef, tick}, State = #state{ticker = TRef, version = Version, sysdescr = Descr}) -> - _ = publish(version, Version), - _ = publish(sysdescr, Descr), - _ = publish(brokers, ekka_mnesia:running_nodes()), - _ = publish(stats, emqx_stats:getstats()), - _ = publish(metrics, emqx_metrics:all()), + publish_any(version, Version), + publish_any(sysdescr, Descr), + publish_any(brokers, ekka_mnesia:running_nodes()), + publish_any(stats, emqx_stats:getstats()), + publish_any(metrics, emqx_metrics:all()), {noreply, tick(State), hibernate}; handle_info(Info, State) -> @@ -192,6 +192,10 @@ uptime(hours, H) -> uptime(days, D) -> [integer_to_list(D), " days, "]. +publish_any(Name, Value) -> + _ = publish(Name, Value), + ok. + publish(uptime, Uptime) -> safe_publish(systop(uptime), Uptime); publish(datetime, Datetime) -> diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index b87251640..8e65785c6 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -568,16 +568,16 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> ]}). inc_recv_stats(Cnt, Oct) -> - _ = emqx_pd:inc_counter(incoming_bytes, Oct), - _ = emqx_pd:inc_counter(recv_cnt, Cnt), - _ = emqx_pd:inc_counter(recv_oct, Oct), + inc_counter(incoming_bytes, Oct), + inc_counter(recv_cnt, Cnt), + inc_counter(recv_oct, Oct), emqx_metrics:inc('bytes.received', Oct). inc_incoming_stats(Packet = ?PACKET(Type)) -> _ = emqx_pd:inc_counter(recv_pkt, 1), if Type == ?PUBLISH -> - _ = emqx_pd:inc_counter(recv_msg, 1), - _ = emqx_pd:inc_counter(incoming_pubs, 1); + inc_counter(recv_msg, 1), + inc_counter(incoming_pubs, 1); true -> ok end, emqx_metrics:inc_recv(Packet). @@ -585,18 +585,21 @@ inc_incoming_stats(Packet = ?PACKET(Type)) -> inc_outgoing_stats(Packet = ?PACKET(Type)) -> _ = emqx_pd:inc_counter(send_pkt, 1), if Type == ?PUBLISH -> - _ = emqx_pd:inc_counter(send_msg, 1), - _ = emqx_pd:inc_counter(outgoing_pubs, 1); + inc_counter(send_msg, 1), + inc_counter(outgoing_pubs, 1); true -> ok end, emqx_metrics:inc_sent(Packet). inc_sent_stats(Cnt, Oct) -> - _ = emqx_pd:inc_counter(outgoing_bytes, Oct), - _ = emqx_pd:inc_counter(send_cnt, Cnt), - _ = emqx_pd:inc_counter(send_oct, Oct), + inc_counter(outgoing_bytes, Oct), + inc_counter(send_cnt, Cnt), + inc_counter(send_oct, Oct), emqx_metrics:inc('bytes.sent', Oct). +inc_counter(Name, Value) -> + _ = emqx_pd:inc_counter(Name, Value), + ok. %%-------------------------------------------------------------------- %% Helper functions %%--------------------------------------------------------------------