refactor(gw): add bytes & packets metrics

This commit is contained in:
JianBo He 2021-07-23 21:38:56 +08:00
parent 80eb7f313d
commit c36cdf9682
7 changed files with 116 additions and 27 deletions

View File

@ -640,8 +640,13 @@ next_incoming_msgs(Packets) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Handle incoming packet %% Handle incoming packet
handle_incoming(Packet, State = #state{frame_mod = FrameMod}) -> handle_incoming(Packet, State = #state{
ok = inc_incoming_stats(Packet), channel = Channel,
frame_mod = FrameMod,
chann_mod = ChannMod
}) ->
Ctx = ChannMod:info(ctx, Channel),
ok = inc_incoming_stats(Ctx, FrameMod, Packet),
?LOG(debug, "RECV ~s", [FrameMod:format(Packet)]), ?LOG(debug, "RECV ~s", [FrameMod:format(Packet)]),
with_channel(handle_in, [Packet], State). with_channel(handle_in, [Packet], State).
@ -688,7 +693,7 @@ serialize_and_inc_stats_fun(#state{
ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped'), ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped'),
<<>>; <<>>;
Data -> ?LOG(debug, "SEND ~s", [FrameMod:format(Packet)]), Data -> ?LOG(debug, "SEND ~s", [FrameMod:format(Packet)]),
ok = inc_outgoing_stats(Packet), ok = inc_outgoing_stats(Ctx, FrameMod, Packet),
Data Data
end end
end. end.
@ -796,30 +801,31 @@ close_socket(State = #state{socket = Socket}) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Inc incoming/outgoing stats %% Inc incoming/outgoing stats
%% XXX: How to stats? inc_incoming_stats(Ctx, FrameMod, Packet) ->
inc_incoming_stats(_Packet) ->
inc_counter(recv_pkt, 1), inc_counter(recv_pkt, 1),
ok. case FrameMod:is_message(Packet) of
%case Type =:= ?CMD_SEND of true ->
% true -> inc_counter(recv_msg, 1),
% inc_counter(recv_msg, 1), inc_counter(incoming_pubs, 1);
% inc_counter(incoming_pubs, 1); false ->
% false -> ok
% ok end,
%end, Name = list_to_atom(
%emqx_metrics:inc_recv(Packet). lists:concat(["packets.", FrameMod:type(Packet), ".recevied"])),
emqx_gateway_ctx:metrics_inc(Ctx, Name).
inc_outgoing_stats(_Packet) -> inc_outgoing_stats(Ctx, FrameMod, Packet) ->
inc_counter(send_pkt, 1), inc_counter(send_pkt, 1),
ok. case FrameMod:is_message(Packet) of
%case Type =:= ?CMD_MESSAGE of true ->
% true -> inc_counter(send_msg, 1),
% inc_counter(send_msg, 1), inc_counter(outgoing_pubs, 1);
% inc_counter(outgoing_pubs, 1); false ->
% false -> ok
% ok end,
%end, Name = list_to_atom(
%emqx_metrics:inc_sent(Packet). lists:concat(["packets.", FrameMod:type(Packet), ".sent"])),
emqx_gateway_ctx:metrics_inc(Ctx, Name).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helper functions %% Helper functions

View File

@ -22,7 +22,6 @@
%% %%
-module(emqx_gateway_frame). -module(emqx_gateway_frame).
-type parse_state() :: map(). -type parse_state() :: map().
-type frame() :: any(). -type frame() :: any().
@ -38,11 +37,21 @@
%% @doc Initial the frame parser states %% @doc Initial the frame parser states
-callback initial_parse_state(map()) -> parse_state(). -callback initial_parse_state(map()) -> parse_state().
%% @doc
-callback serialize_opts() -> serialize_options(). -callback serialize_opts() -> serialize_options().
%% @doc
-callback serialize_pkt(Frame :: any(), serialize_options()) -> iodata(). -callback serialize_pkt(Frame :: any(), serialize_options()) -> iodata().
%% @doc
-callback parse(binary(), parse_state()) -> parse_result(). -callback parse(binary(), parse_state()) -> parse_result().
%% @doc
-callback format(Frame :: any()) -> string(). -callback format(Frame :: any()) -> string().
%% @doc
-callback type(Frame :: any()) -> atom().
%% @doc
-callback is_message(Frame :: any()) -> boolean().

View File

@ -243,6 +243,7 @@ open_session(Type, true = _CleanStart, ClientInfo, ConnInfo, CreateSessionFun) -
open_session(_Type, false = _CleanStart, open_session(_Type, false = _CleanStart,
_ClientInfo, _ConnInfo, _CreateSessionFun) -> _ClientInfo, _ConnInfo, _CreateSessionFun) ->
%% TODO:
{error, not_supported_now}. {error, not_supported_now}.
%% @private %% @private
@ -253,7 +254,8 @@ create_session(Type, ClientInfo, ConnInfo, CreateSessionFun) ->
[ClientInfo, ConnInfo] [ClientInfo, ConnInfo]
), ),
ok = emqx_gateway_metrics:inc(Type, 'session.created'), ok = emqx_gateway_metrics:inc(Type, 'session.created'),
SessionInfo = case is_record(Session, emqx_session) of SessionInfo = case is_tuple(Session)
andalso element(1, Session) == session of
true -> emqx_session:info(Session); true -> emqx_session:info(Session);
_ -> _ ->
case is_map(Session) of case is_map(Session) of

View File

@ -25,6 +25,8 @@
, parse/2 , parse/2
, serialize_pkt/2 , serialize_pkt/2
, format/1 , format/1
, is_message/1
, type/1
]). ]).
initial_parse_state(_) -> initial_parse_state(_) ->
@ -41,3 +43,8 @@ serialize_pkt(Data, _Opts) ->
format(Data) -> format(Data) ->
io_lib:format("~p", [Data]). io_lib:format("~p", [Data]).
is_message(_) -> true.
type(_) -> unknown.

View File

@ -348,7 +348,7 @@ handle_in(?SN_SEARCHGW_MSG(_Radius),
handle_in(?SN_ADVERTISE_MSG(_GwId, _Radius), Channel) -> handle_in(?SN_ADVERTISE_MSG(_GwId, _Radius), Channel) ->
% ingore % ingore
{ok, Channel}; shutdown(normal, Channel);
handle_in(?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1, handle_in(?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1,
topic_id_type = TopicIdType topic_id_type = TopicIdType

View File

@ -28,6 +28,8 @@
, serialize_pkt/2 , serialize_pkt/2
, message_type/1 , message_type/1
, format/1 , format/1
, type/1
, is_message/1
]). ]).
-define(flag, 1/binary). -define(flag, 1/binary).
@ -326,3 +328,38 @@ format_flag(#mqtt_sn_flags{dup = Dup, qos = QoS, retain = Retain, will = Will, c
[Dup, QoS, Retain, Will, CleanStart, TopicType]); [Dup, QoS, Retain, Will, CleanStart, TopicType]);
format_flag(_Flag) -> "invalid flag". format_flag(_Flag) -> "invalid flag".
is_message(#mqtt_sn_message{type = Type})
when Type == ?SN_PUBLISH ->
true;
is_message(_) ->
false.
type(#mqtt_sn_message{type = Type}) ->
type(Type);
type(?SN_ADVERTISE) -> advertise;
type(?SN_SEARCHGW) -> serachgw;
type(?SN_GWINFO) -> gwinfo;
type(?SN_CONNECT) -> connect;
type(?SN_CONNACK) -> connack;
type(?SN_WILLTOPICREQ) -> willtopicreq;
type(?SN_WILLTOPIC) -> willtopic;
type(?SN_WILLMSGREQ) -> willmsgreq;
type(?SN_WILLMSG) -> willmsg;
type(?SN_REGISTER) -> register;
type(?SN_REGACK) -> regack;
type(?SN_PUBLISH) -> publish;
type(?SN_PUBACK) -> puback;
type(?SN_PUBCOMP) -> pubcomp;
type(?SN_PUBREC) -> pubrec;
type(?SN_PUBREL) -> pubrel;
type(?SN_SUBSCRIBE) -> subscribe;
type(?SN_SUBACK) -> suback;
type(?SN_UNSUBSCRIBE) -> unsubscribe;
type(?SN_UNSUBACK) -> unsuback;
type(?SN_PINGREQ) -> pingreq;
type(?SN_PINGRESP) -> pingresp;
type(?SN_DISCONNECT) -> disconnect;
type(?SN_WILLTOPICUPD) -> willtopicupd;
type(?SN_WILLTOPICRESP) -> willtopicresp;
type(?SN_WILLMSGUPD) -> willmsgupd;
type(?SN_WILLMSGRESP) -> willmsgresp.

View File

@ -84,6 +84,10 @@
, format/1 , format/1
]). ]).
-export([ type/1
, is_message/1
]).
-define(NULL, 0). -define(NULL, 0).
-define(CR, $\r). -define(CR, $\r).
-define(LF, $\n). -define(LF, $\n).
@ -290,3 +294,27 @@ make(Command, Headers, Body) ->
%% @doc Format a frame %% @doc Format a frame
format(Frame) -> serialize_pkt(Frame, #{}). format(Frame) -> serialize_pkt(Frame, #{}).
is_message(#stomp_frame{command = CMD})
when CMD == ?CMD_SEND;
CMD == ?CMD_MESSAGE ->
true;
is_message(_) -> false.
type(#stomp_frame{command = CMD}) ->
type(CMD);
type(?CMD_STOMP) -> connect;
type(?CMD_CONNECT) -> connect;
type(?CMD_SEND) -> send;
type(?CMD_SUBSCRIBE) -> subscribe;
type(?CMD_UNSUBSCRIBE) -> unsubscribe;
type(?CMD_BEGIN) -> 'begin';
type(?CMD_COMMIT) -> commit;
type(?CMD_ABORT) -> abort;
type(?CMD_ACK) -> ack;
type(?CMD_NACK) -> nack;
type(?CMD_DISCONNECT) -> disconnect;
type(?CMD_CONNECTED) -> connected;
type(?CMD_MESSAGE) -> message;
type(?CMD_RECEIPT) -> receipt;
type(?CMD_ERROR) -> error.