From c36cdf9682926833155cc2b847c409af8ed09313 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 23 Jul 2021 21:38:56 +0800 Subject: [PATCH] refactor(gw): add bytes & packets metrics --- .../src/bhvrs/emqx_gateway_conn.erl | 54 ++++++++++--------- .../src/bhvrs/emqx_gateway_frame.erl | 11 +++- apps/emqx_gateway/src/emqx_gateway_cm.erl | 4 +- .../src/exproto/emqx_exproto_frame.erl | 7 +++ .../src/mqttsn/emqx_sn_channel.erl | 2 +- .../emqx_gateway/src/mqttsn/emqx_sn_frame.erl | 37 +++++++++++++ .../src/stomp/emqx_stomp_frame.erl | 28 ++++++++++ 7 files changed, 116 insertions(+), 27 deletions(-) diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl index d9980b829..7349ec310 100644 --- a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl @@ -640,8 +640,13 @@ next_incoming_msgs(Packets) -> %%-------------------------------------------------------------------- %% Handle incoming packet -handle_incoming(Packet, State = #state{frame_mod = FrameMod}) -> - ok = inc_incoming_stats(Packet), +handle_incoming(Packet, State = #state{ + channel = Channel, + frame_mod = FrameMod, + chann_mod = ChannMod + }) -> + Ctx = ChannMod:info(ctx, Channel), + ok = inc_incoming_stats(Ctx, FrameMod, Packet), ?LOG(debug, "RECV ~s", [FrameMod:format(Packet)]), with_channel(handle_in, [Packet], State). @@ -688,7 +693,7 @@ serialize_and_inc_stats_fun(#state{ ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped'), <<>>; Data -> ?LOG(debug, "SEND ~s", [FrameMod:format(Packet)]), - ok = inc_outgoing_stats(Packet), + ok = inc_outgoing_stats(Ctx, FrameMod, Packet), Data end end. @@ -796,30 +801,31 @@ close_socket(State = #state{socket = Socket}) -> %%-------------------------------------------------------------------- %% Inc incoming/outgoing stats -%% XXX: How to stats? -inc_incoming_stats(_Packet) -> +inc_incoming_stats(Ctx, FrameMod, Packet) -> inc_counter(recv_pkt, 1), - ok. - %case Type =:= ?CMD_SEND of - % true -> - % inc_counter(recv_msg, 1), - % inc_counter(incoming_pubs, 1); - % false -> - % ok - %end, - %emqx_metrics:inc_recv(Packet). + case FrameMod:is_message(Packet) of + true -> + inc_counter(recv_msg, 1), + inc_counter(incoming_pubs, 1); + false -> + ok + end, + Name = list_to_atom( + lists:concat(["packets.", FrameMod:type(Packet), ".recevied"])), + emqx_gateway_ctx:metrics_inc(Ctx, Name). -inc_outgoing_stats(_Packet) -> +inc_outgoing_stats(Ctx, FrameMod, Packet) -> inc_counter(send_pkt, 1), - ok. - %case Type =:= ?CMD_MESSAGE of - % true -> - % inc_counter(send_msg, 1), - % inc_counter(outgoing_pubs, 1); - % false -> - % ok - %end, - %emqx_metrics:inc_sent(Packet). + case FrameMod:is_message(Packet) of + true -> + inc_counter(send_msg, 1), + inc_counter(outgoing_pubs, 1); + false -> + ok + end, + Name = list_to_atom( + lists:concat(["packets.", FrameMod:type(Packet), ".sent"])), + emqx_gateway_ctx:metrics_inc(Ctx, Name). %%-------------------------------------------------------------------- %% Helper functions diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_frame.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_frame.erl index 4cce837aa..87410f7d8 100644 --- a/apps/emqx_gateway/src/bhvrs/emqx_gateway_frame.erl +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_frame.erl @@ -22,7 +22,6 @@ %% -module(emqx_gateway_frame). - -type parse_state() :: map(). -type frame() :: any(). @@ -38,11 +37,21 @@ %% @doc Initial the frame parser states -callback initial_parse_state(map()) -> parse_state(). +%% @doc -callback serialize_opts() -> serialize_options(). +%% @doc -callback serialize_pkt(Frame :: any(), serialize_options()) -> iodata(). +%% @doc -callback parse(binary(), parse_state()) -> parse_result(). +%% @doc -callback format(Frame :: any()) -> string(). +%% @doc +-callback type(Frame :: any()) -> atom(). + +%% @doc +-callback is_message(Frame :: any()) -> boolean(). + diff --git a/apps/emqx_gateway/src/emqx_gateway_cm.erl b/apps/emqx_gateway/src/emqx_gateway_cm.erl index 651c54f4b..e4d21e1a5 100644 --- a/apps/emqx_gateway/src/emqx_gateway_cm.erl +++ b/apps/emqx_gateway/src/emqx_gateway_cm.erl @@ -243,6 +243,7 @@ open_session(Type, true = _CleanStart, ClientInfo, ConnInfo, CreateSessionFun) - open_session(_Type, false = _CleanStart, _ClientInfo, _ConnInfo, _CreateSessionFun) -> + %% TODO: {error, not_supported_now}. %% @private @@ -253,7 +254,8 @@ create_session(Type, ClientInfo, ConnInfo, CreateSessionFun) -> [ClientInfo, ConnInfo] ), ok = emqx_gateway_metrics:inc(Type, 'session.created'), - SessionInfo = case is_record(Session, emqx_session) of + SessionInfo = case is_tuple(Session) + andalso element(1, Session) == session of true -> emqx_session:info(Session); _ -> case is_map(Session) of diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_frame.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_frame.erl index 656766b9b..ecd6273ca 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_frame.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_frame.erl @@ -25,6 +25,8 @@ , parse/2 , serialize_pkt/2 , format/1 + , is_message/1 + , type/1 ]). initial_parse_state(_) -> @@ -41,3 +43,8 @@ serialize_pkt(Data, _Opts) -> format(Data) -> io_lib:format("~p", [Data]). + +is_message(_) -> true. + +type(_) -> unknown. + diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl index 4441a30a3..d6a868da8 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl @@ -348,7 +348,7 @@ handle_in(?SN_SEARCHGW_MSG(_Radius), handle_in(?SN_ADVERTISE_MSG(_GwId, _Radius), Channel) -> % ingore - {ok, Channel}; + shutdown(normal, Channel); handle_in(?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1, topic_id_type = TopicIdType diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_frame.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_frame.erl index 343232854..32d1a21a2 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_frame.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_frame.erl @@ -28,6 +28,8 @@ , serialize_pkt/2 , message_type/1 , format/1 + , type/1 + , is_message/1 ]). -define(flag, 1/binary). @@ -326,3 +328,38 @@ format_flag(#mqtt_sn_flags{dup = Dup, qos = QoS, retain = Retain, will = Will, c [Dup, QoS, Retain, Will, CleanStart, TopicType]); format_flag(_Flag) -> "invalid flag". +is_message(#mqtt_sn_message{type = Type}) + when Type == ?SN_PUBLISH -> + true; +is_message(_) -> + false. + +type(#mqtt_sn_message{type = Type}) -> + type(Type); +type(?SN_ADVERTISE) -> advertise; +type(?SN_SEARCHGW) -> serachgw; +type(?SN_GWINFO) -> gwinfo; +type(?SN_CONNECT) -> connect; +type(?SN_CONNACK) -> connack; +type(?SN_WILLTOPICREQ) -> willtopicreq; +type(?SN_WILLTOPIC) -> willtopic; +type(?SN_WILLMSGREQ) -> willmsgreq; +type(?SN_WILLMSG) -> willmsg; +type(?SN_REGISTER) -> register; +type(?SN_REGACK) -> regack; +type(?SN_PUBLISH) -> publish; +type(?SN_PUBACK) -> puback; +type(?SN_PUBCOMP) -> pubcomp; +type(?SN_PUBREC) -> pubrec; +type(?SN_PUBREL) -> pubrel; +type(?SN_SUBSCRIBE) -> subscribe; +type(?SN_SUBACK) -> suback; +type(?SN_UNSUBSCRIBE) -> unsubscribe; +type(?SN_UNSUBACK) -> unsuback; +type(?SN_PINGREQ) -> pingreq; +type(?SN_PINGRESP) -> pingresp; +type(?SN_DISCONNECT) -> disconnect; +type(?SN_WILLTOPICUPD) -> willtopicupd; +type(?SN_WILLTOPICRESP) -> willtopicresp; +type(?SN_WILLMSGUPD) -> willmsgupd; +type(?SN_WILLMSGRESP) -> willmsgresp. diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl index c003da078..fd05abfe6 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl @@ -84,6 +84,10 @@ , format/1 ]). +-export([ type/1 + , is_message/1 + ]). + -define(NULL, 0). -define(CR, $\r). -define(LF, $\n). @@ -290,3 +294,27 @@ make(Command, Headers, Body) -> %% @doc Format a frame format(Frame) -> serialize_pkt(Frame, #{}). + +is_message(#stomp_frame{command = CMD}) + when CMD == ?CMD_SEND; + CMD == ?CMD_MESSAGE -> + true; +is_message(_) -> false. + +type(#stomp_frame{command = CMD}) -> + type(CMD); +type(?CMD_STOMP) -> connect; +type(?CMD_CONNECT) -> connect; +type(?CMD_SEND) -> send; +type(?CMD_SUBSCRIBE) -> subscribe; +type(?CMD_UNSUBSCRIBE) -> unsubscribe; +type(?CMD_BEGIN) -> 'begin'; +type(?CMD_COMMIT) -> commit; +type(?CMD_ABORT) -> abort; +type(?CMD_ACK) -> ack; +type(?CMD_NACK) -> nack; +type(?CMD_DISCONNECT) -> disconnect; +type(?CMD_CONNECTED) -> connected; +type(?CMD_MESSAGE) -> message; +type(?CMD_RECEIPT) -> receipt; +type(?CMD_ERROR) -> error.