fix(stomp): counting packets and messages

This commit is contained in:
JianBo He 2021-11-03 13:29:03 +08:00
parent 0a7f04caa3
commit af7b5704ab
2 changed files with 55 additions and 20 deletions

View File

@ -205,7 +205,11 @@ exit_on_sock_error(timeout) ->
exit_on_sock_error(Reason) -> exit_on_sock_error(Reason) ->
erlang:exit({shutdown, Reason}). erlang:exit({shutdown, Reason}).
send(Data, Transport, Sock, ConnPid) -> send(Frame, Transport, Sock, ConnPid) ->
?LOG(info, "SEND Frame: ~s", [emqx_stomp_frame:format(Frame)]),
ok = inc_outgoing_stats(Frame),
Data = emqx_stomp_frame:serialize(Frame),
?LOG(debug, "SEND ~p", [Data]),
try Transport:async_send(Sock, Data) of try Transport:async_send(Sock, Data) of
ok -> ok; ok -> ok;
{error, Reason} -> ConnPid ! {shutdown, Reason} {error, Reason} -> ConnPid ! {shutdown, Reason}
@ -386,6 +390,7 @@ received(Bytes, State = #state{parser = Parser,
noreply(State#state{parser = NewParser}); noreply(State#state{parser = NewParser});
{ok, Frame, Rest} -> {ok, Frame, Rest} ->
?LOG(info, "RECV Frame: ~s", [emqx_stomp_frame:format(Frame)]), ?LOG(info, "RECV Frame: ~s", [emqx_stomp_frame:format(Frame)]),
ok = inc_incoming_stats(Frame),
case emqx_stomp_protocol:received(Frame, PState) of case emqx_stomp_protocol:received(Frame, PState) of
{ok, PState1} -> {ok, PState1} ->
received(Rest, reset_parser(State#state{pstate = PState1})); received(Rest, reset_parser(State#state{pstate = PState1}));
@ -425,6 +430,35 @@ close_socket(State = #state{transport = Transport, socket = Socket}) ->
ok = Transport:fast_close(Socket), ok = Transport:fast_close(Socket),
State#state{sockstate = closed}. State#state{sockstate = closed}.
%%--------------------------------------------------------------------
%% Inc incoming/outgoing stats
inc_incoming_stats(#stomp_frame{command = Cmd}) ->
inc_counter(recv_pkt, 1),
case Cmd of
<<"SEND">> ->
inc_counter(recv_msg, 1),
inc_counter(incoming_pubs, 1),
emqx_metrics:inc('messages.received'),
emqx_metrics:inc('messages.qos1.received');
_ ->
ok
end,
emqx_metrics:inc('packets.received').
inc_outgoing_stats(#stomp_frame{command = Cmd}) ->
inc_counter(send_pkt, 1),
case Cmd of
<<"MESSAGE">> ->
inc_counter(send_msg, 1),
inc_counter(outgoing_pubs, 1),
emqx_metrics:inc('messages.sent'),
emqx_metrics:inc('messages.qos1.sent');
_ ->
ok
end,
emqx_metrics:inc('packets.sent').
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Ensure rate limit %% Ensure rate limit

View File

@ -231,27 +231,24 @@ received(#stomp_frame{command = <<"CONNECT">>, headers = Headers},
default_user(State) default_user(State)
) of ) of
true -> true ->
NLogin = case Login == undefined orelse Login == <<>> of ClientId = emqx_guid:to_base62(emqx_guid:gen()),
false -> Login; emqx_logger:set_metadata_clientid(ClientId),
true -> emqx_guid:to_base62(emqx_guid:gen())
end,
emqx_logger:set_metadata_clientid(NLogin),
ConnInfo = State#pstate.conninfo, ConnInfo = State#pstate.conninfo,
ClitInfo = State#pstate.clientinfo, ClitInfo = State#pstate.clientinfo,
NConnInfo = ConnInfo#{ NConnInfo = ConnInfo#{
proto_ver => Version, proto_ver => Version,
clientid => NLogin, clientid => ClientId,
username => NLogin username => Login
}, },
NClitInfo = ClitInfo#{ NClitInfo = ClitInfo#{
clientid => NLogin, clientid => ClientId,
username => NLogin username => Login
}, },
ConnPid = self(), ConnPid = self(),
_ = emqx_cm_locker:trans(NLogin, fun(_) -> _ = emqx_cm_locker:trans(ClientId, fun(_) ->
emqx_cm:discard_session(NLogin), emqx_cm:discard_session(ClientId),
emqx_cm:register_channel(NLogin, ConnPid, NConnInfo) emqx_cm:register_channel(ClientId, ConnPid, NConnInfo)
end), end),
Heartbeats = parse_heartbeats(header(<<"heart-beat">>, Headers, <<"0,0">>)), Heartbeats = parse_heartbeats(header(<<"heart-beat">>, Headers, <<"0,0">>)),
NState = start_heartbeart_timer( NState = start_heartbeart_timer(
@ -406,8 +403,13 @@ received(#stomp_frame{command = <<"DISCONNECT">>, headers = Headers}, State) ->
_ = maybe_send_receipt(receipt_id(Headers), State), _ = maybe_send_receipt(receipt_id(Headers), State),
{stop, normal, State}. {stop, normal, State}.
send(Msg = #message{topic = Topic, headers = Headers, payload = Payload}, send(Msg0 = #message{},
State = #pstate{subscriptions = Subs}) -> State = #pstate{clientinfo = ClientInfo, subscriptions = Subs}) ->
ok = emqx_metrics:inc('messages.delivered'),
Msg = emqx_hooks:run_fold('message.delivered', [ClientInfo], Msg0),
#message{topic = Topic,
headers = Headers,
payload = Payload} = Msg,
case find_sub_by_topic(Topic, Subs) of case find_sub_by_topic(Topic, Subs) of
{Topic, #{sub_props := #{id := Id, ack := Ack}}} -> {Topic, #{sub_props := #{id := Id, ack := Ack}}} ->
Headers0 = [{<<"subscription">>, Id}, Headers0 = [{<<"subscription">>, Id},
@ -423,6 +425,8 @@ send(Msg = #message{topic = Topic, headers = Headers, payload = Payload},
Frame = #stomp_frame{command = <<"MESSAGE">>, Frame = #stomp_frame{command = <<"MESSAGE">>,
headers = Headers1 ++ maps:get(stomp_headers, Headers, []), headers = Headers1 ++ maps:get(stomp_headers, Headers, []),
body = Payload}, body = Payload},
send(Frame, State); send(Frame, State);
undefined -> undefined ->
?LOG(error, "Stomp dropped: ~p", [Msg]), ?LOG(error, "Stomp dropped: ~p", [Msg]),
@ -430,10 +434,7 @@ send(Msg = #message{topic = Topic, headers = Headers, payload = Payload},
end; end;
send(Frame, State = #pstate{sendfun = {Fun, Args}}) when is_record(Frame, stomp_frame) -> send(Frame, State = #pstate{sendfun = {Fun, Args}}) when is_record(Frame, stomp_frame) ->
?LOG(info, "SEND Frame: ~s", [emqx_stomp_frame:format(Frame)]), erlang:apply(Fun, [Frame] ++ Args),
Data = emqx_stomp_frame:serialize(Frame),
?LOG(debug, "SEND ~p", [Data]),
erlang:apply(Fun, [Data] ++ Args),
{ok, State}. {ok, State}.
shutdown(Reason, State = #pstate{connected = true}) -> shutdown(Reason, State = #pstate{connected = true}) ->
@ -453,7 +454,7 @@ timeout(_TRef, {incoming, NewVal},
timeout(_TRef, {outgoing, NewVal}, timeout(_TRef, {outgoing, NewVal},
State = #pstate{heart_beats = HrtBt, State = #pstate{heart_beats = HrtBt,
heartfun = {Fun, Args}}) -> heartfun = {Fun, Args}}) ->
case emqx_stomp_heartbeat:check(outgoing, NewVal, HrtBt) of case emqx_stomp_heartbeat:check(outgoing, NewVal, HrtBt) of
{error, timeout} -> {error, timeout} ->
_ = erlang:apply(Fun, Args), _ = erlang:apply(Fun, Args),