From af7b5704ab4ab26c94921c263d412d1d0a0d78eb Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 3 Nov 2021 13:29:03 +0800 Subject: [PATCH] fix(stomp): counting packets and messages --- apps/emqx_stomp/src/emqx_stomp_connection.erl | 36 ++++++++++++++++- apps/emqx_stomp/src/emqx_stomp_protocol.erl | 39 ++++++++++--------- 2 files changed, 55 insertions(+), 20 deletions(-) diff --git a/apps/emqx_stomp/src/emqx_stomp_connection.erl b/apps/emqx_stomp/src/emqx_stomp_connection.erl index 7ec58a6f6..b2157c7ce 100644 --- a/apps/emqx_stomp/src/emqx_stomp_connection.erl +++ b/apps/emqx_stomp/src/emqx_stomp_connection.erl @@ -205,7 +205,11 @@ exit_on_sock_error(timeout) -> exit_on_sock_error(Reason) -> erlang:exit({shutdown, Reason}). -send(Data, Transport, Sock, ConnPid) -> +send(Frame, Transport, Sock, ConnPid) -> + ?LOG(info, "SEND Frame: ~s", [emqx_stomp_frame:format(Frame)]), + ok = inc_outgoing_stats(Frame), + Data = emqx_stomp_frame:serialize(Frame), + ?LOG(debug, "SEND ~p", [Data]), try Transport:async_send(Sock, Data) of ok -> ok; {error, Reason} -> ConnPid ! {shutdown, Reason} @@ -386,6 +390,7 @@ received(Bytes, State = #state{parser = Parser, noreply(State#state{parser = NewParser}); {ok, Frame, Rest} -> ?LOG(info, "RECV Frame: ~s", [emqx_stomp_frame:format(Frame)]), + ok = inc_incoming_stats(Frame), case emqx_stomp_protocol:received(Frame, PState) of {ok, PState1} -> received(Rest, reset_parser(State#state{pstate = PState1})); @@ -425,6 +430,35 @@ close_socket(State = #state{transport = Transport, socket = Socket}) -> ok = Transport:fast_close(Socket), State#state{sockstate = closed}. +%%-------------------------------------------------------------------- +%% Inc incoming/outgoing stats + +inc_incoming_stats(#stomp_frame{command = Cmd}) -> + inc_counter(recv_pkt, 1), + case Cmd of + <<"SEND">> -> + inc_counter(recv_msg, 1), + inc_counter(incoming_pubs, 1), + emqx_metrics:inc('messages.received'), + emqx_metrics:inc('messages.qos1.received'); + _ -> + ok + end, + emqx_metrics:inc('packets.received'). + +inc_outgoing_stats(#stomp_frame{command = Cmd}) -> + inc_counter(send_pkt, 1), + case Cmd of + <<"MESSAGE">> -> + inc_counter(send_msg, 1), + inc_counter(outgoing_pubs, 1), + emqx_metrics:inc('messages.sent'), + emqx_metrics:inc('messages.qos1.sent'); + _ -> + ok + end, + emqx_metrics:inc('packets.sent'). + %%-------------------------------------------------------------------- %% Ensure rate limit diff --git a/apps/emqx_stomp/src/emqx_stomp_protocol.erl b/apps/emqx_stomp/src/emqx_stomp_protocol.erl index 0dbfbb13b..06740430c 100644 --- a/apps/emqx_stomp/src/emqx_stomp_protocol.erl +++ b/apps/emqx_stomp/src/emqx_stomp_protocol.erl @@ -231,27 +231,24 @@ received(#stomp_frame{command = <<"CONNECT">>, headers = Headers}, default_user(State) ) of true -> - NLogin = case Login == undefined orelse Login == <<>> of - false -> Login; - true -> emqx_guid:to_base62(emqx_guid:gen()) - end, - emqx_logger:set_metadata_clientid(NLogin), + ClientId = emqx_guid:to_base62(emqx_guid:gen()), + emqx_logger:set_metadata_clientid(ClientId), ConnInfo = State#pstate.conninfo, ClitInfo = State#pstate.clientinfo, NConnInfo = ConnInfo#{ proto_ver => Version, - clientid => NLogin, - username => NLogin + clientid => ClientId, + username => Login }, NClitInfo = ClitInfo#{ - clientid => NLogin, - username => NLogin + clientid => ClientId, + username => Login }, ConnPid = self(), - _ = emqx_cm_locker:trans(NLogin, fun(_) -> - emqx_cm:discard_session(NLogin), - emqx_cm:register_channel(NLogin, ConnPid, NConnInfo) + _ = emqx_cm_locker:trans(ClientId, fun(_) -> + emqx_cm:discard_session(ClientId), + emqx_cm:register_channel(ClientId, ConnPid, NConnInfo) end), Heartbeats = parse_heartbeats(header(<<"heart-beat">>, Headers, <<"0,0">>)), NState = start_heartbeart_timer( @@ -406,8 +403,13 @@ received(#stomp_frame{command = <<"DISCONNECT">>, headers = Headers}, State) -> _ = maybe_send_receipt(receipt_id(Headers), State), {stop, normal, State}. -send(Msg = #message{topic = Topic, headers = Headers, payload = Payload}, - State = #pstate{subscriptions = Subs}) -> +send(Msg0 = #message{}, + State = #pstate{clientinfo = ClientInfo, subscriptions = Subs}) -> + ok = emqx_metrics:inc('messages.delivered'), + Msg = emqx_hooks:run_fold('message.delivered', [ClientInfo], Msg0), + #message{topic = Topic, + headers = Headers, + payload = Payload} = Msg, case find_sub_by_topic(Topic, Subs) of {Topic, #{sub_props := #{id := Id, ack := Ack}}} -> Headers0 = [{<<"subscription">>, Id}, @@ -423,6 +425,8 @@ send(Msg = #message{topic = Topic, headers = Headers, payload = Payload}, Frame = #stomp_frame{command = <<"MESSAGE">>, headers = Headers1 ++ maps:get(stomp_headers, Headers, []), body = Payload}, + + send(Frame, State); undefined -> ?LOG(error, "Stomp dropped: ~p", [Msg]), @@ -430,10 +434,7 @@ send(Msg = #message{topic = Topic, headers = Headers, payload = Payload}, end; send(Frame, State = #pstate{sendfun = {Fun, Args}}) when is_record(Frame, stomp_frame) -> - ?LOG(info, "SEND Frame: ~s", [emqx_stomp_frame:format(Frame)]), - Data = emqx_stomp_frame:serialize(Frame), - ?LOG(debug, "SEND ~p", [Data]), - erlang:apply(Fun, [Data] ++ Args), + erlang:apply(Fun, [Frame] ++ Args), {ok, State}. shutdown(Reason, State = #pstate{connected = true}) -> @@ -453,7 +454,7 @@ timeout(_TRef, {incoming, NewVal}, timeout(_TRef, {outgoing, NewVal}, State = #pstate{heart_beats = HrtBt, - heartfun = {Fun, Args}}) -> + heartfun = {Fun, Args}}) -> case emqx_stomp_heartbeat:check(outgoing, NewVal, HrtBt) of {error, timeout} -> _ = erlang:apply(Fun, Args),