fix(gw): correct stats/1 for mqttsn&stomp

This commit is contained in:
JianBo He 2021-07-22 10:20:28 +08:00
parent 709f6a535b
commit c6b3447598
2 changed files with 12 additions and 7 deletions

View File

@ -172,7 +172,7 @@ info(conn_state, #channel{conn_state = ConnState}) ->
info(clientinfo, #channel{clientinfo = ClientInfo}) -> info(clientinfo, #channel{clientinfo = ClientInfo}) ->
ClientInfo; ClientInfo;
info(session, #channel{session = Session}) -> info(session, #channel{session = Session}) ->
Session; emqx_misc:maybe_apply(fun emqx_session:info/1, Session);
info(will_msg, #channel{will_msg = WillMsg}) -> info(will_msg, #channel{will_msg = WillMsg}) ->
WillMsg; WillMsg;
info(clientid, #channel{clientinfo = #{clientid := ClientId}}) -> info(clientid, #channel{clientinfo = #{clientid := ClientId}}) ->
@ -180,8 +180,9 @@ info(clientid, #channel{clientinfo = #{clientid := ClientId}}) ->
info(ctx, #channel{ctx = Ctx}) -> info(ctx, #channel{ctx = Ctx}) ->
Ctx. Ctx.
stats(_Channel) -> -spec(stats(channel()) -> emqx_types:stats()).
[]. stats(#channel{session = Session})->
emqx_session:stats(Session).
set_conn_state(ConnState, Channel) -> set_conn_state(ConnState, Channel) ->
Channel#channel{conn_state = ConnState}. Channel#channel{conn_state = ConnState}.
@ -591,7 +592,7 @@ handle_in(SubPkt = ?SN_SUBSCRIBE_MSG(_, MsgId, _), Channel) ->
{ok, {TopicId, GrantedQoS}, NChannel} -> {ok, {TopicId, GrantedQoS}, NChannel} ->
SubAck = ?SN_SUBACK_MSG(#mqtt_sn_flags{qos = GrantedQoS}, SubAck = ?SN_SUBACK_MSG(#mqtt_sn_flags{qos = GrantedQoS},
TopicId, MsgId, ?SN_RC_ACCEPTED), TopicId, MsgId, ?SN_RC_ACCEPTED),
{ok, {outgoing, SubAck}, NChannel}; {ok, outgoing_and_update(SubAck), NChannel};
{error, ReturnCode, NChannel} -> {error, ReturnCode, NChannel} ->
SubAck = ?SN_SUBACK_MSG(#mqtt_sn_flags{}, SubAck = ?SN_SUBACK_MSG(#mqtt_sn_flags{},
?SN_INVALID_TOPIC_ID, ?SN_INVALID_TOPIC_ID,
@ -608,7 +609,7 @@ handle_in(UnsubPkt = ?SN_UNSUBSCRIBE_MSG(_, MsgId, TopicIdOrName),
], UnsubPkt, Channel) of ], UnsubPkt, Channel) of
{ok, _TopicName, NChannel} -> {ok, _TopicName, NChannel} ->
UnsubAck = ?SN_UNSUBACK_MSG(MsgId), UnsubAck = ?SN_UNSUBACK_MSG(MsgId),
{ok, {outgoing, UnsubAck}, NChannel}; {ok, outgoing_and_update(UnsubAck), NChannel};
{error, Reason, NChannel} -> {error, Reason, NChannel} ->
?LOG(warning, "Unsubscribe ~p failed: ~0p", ?LOG(warning, "Unsubscribe ~p failed: ~0p",
[TopicIdOrName, Reason]), [TopicIdOrName, Reason]),
@ -668,6 +669,9 @@ after_message_acked(ClientInfo, Msg,
'message.acked', 'message.acked',
[ClientInfo, emqx_message:set_header(puback_props, #{}, Msg)]). [ClientInfo, emqx_message:set_header(puback_props, #{}, Msg)]).
outgoing_and_update(Pkt) ->
[{outgoing, Pkt}, {event, update}].
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Handle Publish %% Handle Publish

View File

@ -166,8 +166,9 @@ info(clientid, #channel{clientinfo = #{clientid := ClientId}}) ->
info(ctx, #channel{ctx = Ctx}) -> info(ctx, #channel{ctx = Ctx}) ->
Ctx. Ctx.
stats(_Channel) -> -spec(stats(channel()) -> emqx_types:stats()).
[]. stats(#channel{subscriptions = Subs}) ->
[{subscriptions_cnt, length(Subs)}].
set_conn_state(ConnState, Channel) -> set_conn_state(ConnState, Channel) ->
Channel#channel{conn_state = ConnState}. Channel#channel{conn_state = ConnState}.