From c6b34475988b0a3cbfbc4e53b134d4a6afb50b9e Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 22 Jul 2021 10:20:28 +0800 Subject: [PATCH] fix(gw): correct stats/1 for mqttsn&stomp --- apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl | 14 +++++++++----- apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl | 5 +++-- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl index 23d5fc030..e4875bb36 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl @@ -172,7 +172,7 @@ info(conn_state, #channel{conn_state = ConnState}) -> info(clientinfo, #channel{clientinfo = ClientInfo}) -> ClientInfo; info(session, #channel{session = Session}) -> - Session; + emqx_misc:maybe_apply(fun emqx_session:info/1, Session); info(will_msg, #channel{will_msg = WillMsg}) -> WillMsg; info(clientid, #channel{clientinfo = #{clientid := ClientId}}) -> @@ -180,8 +180,9 @@ info(clientid, #channel{clientinfo = #{clientid := ClientId}}) -> info(ctx, #channel{ctx = Ctx}) -> Ctx. -stats(_Channel) -> - []. +-spec(stats(channel()) -> emqx_types:stats()). +stats(#channel{session = Session})-> + emqx_session:stats(Session). set_conn_state(ConnState, Channel) -> Channel#channel{conn_state = ConnState}. @@ -591,7 +592,7 @@ handle_in(SubPkt = ?SN_SUBSCRIBE_MSG(_, MsgId, _), Channel) -> {ok, {TopicId, GrantedQoS}, NChannel} -> SubAck = ?SN_SUBACK_MSG(#mqtt_sn_flags{qos = GrantedQoS}, TopicId, MsgId, ?SN_RC_ACCEPTED), - {ok, {outgoing, SubAck}, NChannel}; + {ok, outgoing_and_update(SubAck), NChannel}; {error, ReturnCode, NChannel} -> SubAck = ?SN_SUBACK_MSG(#mqtt_sn_flags{}, ?SN_INVALID_TOPIC_ID, @@ -608,7 +609,7 @@ handle_in(UnsubPkt = ?SN_UNSUBSCRIBE_MSG(_, MsgId, TopicIdOrName), ], UnsubPkt, Channel) of {ok, _TopicName, NChannel} -> UnsubAck = ?SN_UNSUBACK_MSG(MsgId), - {ok, {outgoing, UnsubAck}, NChannel}; + {ok, outgoing_and_update(UnsubAck), NChannel}; {error, Reason, NChannel} -> ?LOG(warning, "Unsubscribe ~p failed: ~0p", [TopicIdOrName, Reason]), @@ -668,6 +669,9 @@ after_message_acked(ClientInfo, Msg, 'message.acked', [ClientInfo, emqx_message:set_header(puback_props, #{}, Msg)]). +outgoing_and_update(Pkt) -> + [{outgoing, Pkt}, {event, update}]. + %%-------------------------------------------------------------------- %% Handle Publish diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl index 7dc2b2fbf..d4b4c7fe1 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl @@ -166,8 +166,9 @@ info(clientid, #channel{clientinfo = #{clientid := ClientId}}) -> info(ctx, #channel{ctx = Ctx}) -> Ctx. -stats(_Channel) -> - []. +-spec(stats(channel()) -> emqx_types:stats()). +stats(#channel{subscriptions = Subs}) -> + [{subscriptions_cnt, length(Subs)}]. set_conn_state(ConnState, Channel) -> Channel#channel{conn_state = ConnState}.