From 327b0c0995fecfd21d933b1c6cb43ab98631f275 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Fri, 9 Apr 2021 18:55:59 +0800 Subject: [PATCH] fix(mqtt_sn): send pingresp until all pubacks received --- apps/emqx_sn/src/emqx_sn_gateway.erl | 53 +++++++++++--------- apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl | 3 ++ src/emqx_channel.erl | 3 ++ 3 files changed, 36 insertions(+), 23 deletions(-) diff --git a/apps/emqx_sn/src/emqx_sn_gateway.erl b/apps/emqx_sn/src/emqx_sn_gateway.erl index 0b0e7a217..bd343e038 100644 --- a/apps/emqx_sn/src/emqx_sn_gateway.erl +++ b/apps/emqx_sn/src/emqx_sn_gateway.erl @@ -324,7 +324,7 @@ connected(cast, {incoming, ?SN_PUBACK_MSG(TopicId, MsgId, RC)}, State) -> connected(cast, {incoming, ?SN_PUBREC_MSG(PubRec, MsgId)}, State) when PubRec == ?SN_PUBREC; PubRec == ?SN_PUBREL; PubRec == ?SN_PUBCOMP -> - do_pubrec(PubRec, MsgId, State); + do_pubrec(PubRec, MsgId, connected, State); connected(cast, {incoming, ?SN_SUBSCRIBE_MSG(Flags, MsgId, TopicId)}, State) -> #mqtt_sn_flags{qos = QoS, topic_id_type = TopicIdType} = Flags, @@ -412,19 +412,19 @@ asleep(cast, {incoming, ?SN_PINGREQ_MSG(ClientIdPing)}, case ClientIdPing of ClientId -> inc_ping_counter(), - case emqx_session:dequeue(emqx_channel:get_session(Channel)) of - {ok, Session0} -> + case emqx_session:replay(emqx_channel:get_session(Channel)) of + {ok, [], Session0} -> send_message(?SN_PINGRESP_MSG(), State), {keep_state, State#state{ channel = emqx_channel:set_session(Session0, Channel)}}; - {ok, Delivers, Session0} -> - Events = [emqx_message:to_packet(PckId, Msg) || {PckId, Msg} <- Delivers] - ++ [try_goto_asleep], - {next_state, awake, State#state{ - channel = emqx_channel:set_session(Session0, Channel), - has_pending_pingresp = true}, outgoing_events(Events)} + {ok, Publishes, Session0} -> + {Packets, Channel1} = emqx_channel:do_deliver(Publishes, + emqx_channel:set_session(Session0, Channel)), + {next_state, awake, + State#state{channel = Channel1, has_pending_pingresp = true}, + outgoing_events(Packets ++ [try_goto_asleep])} end; - _Other -> + _Other -> {next_state, asleep, State} end; @@ -433,7 +433,7 @@ asleep(cast, {incoming, ?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode)}, State) -> asleep(cast, {incoming, ?SN_PUBREC_MSG(PubRec, MsgId)}, State) when PubRec == ?SN_PUBREC; PubRec == ?SN_PUBREL; PubRec == ?SN_PUBCOMP -> - do_pubrec(PubRec, MsgId, State); + do_pubrec(PubRec, MsgId, asleep, State); % NOTE: what about following scenario: % 1) client go to sleep @@ -471,15 +471,21 @@ awake(cast, {outgoing, Packet}, State) -> awake(cast, {incoming, ?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode)}, State) -> do_puback(TopicId, MsgId, ReturnCode, awake, State); +awake(cast, {incoming, ?SN_PUBREC_MSG(PubRec, MsgId)}, State) + when PubRec == ?SN_PUBREC; PubRec == ?SN_PUBREL; PubRec == ?SN_PUBCOMP -> + do_pubrec(PubRec, MsgId, awake, State); + awake(cast, try_goto_asleep, State=#state{channel = Channel, has_pending_pingresp = PingPending}) -> - case emqx_mqueue:is_empty(emqx_session:info(mqueue, emqx_channel:get_session(Channel))) of - true when PingPending =:= true -> + Inflight = emqx_session:info(inflight, emqx_channel:get_session(Channel)), + case emqx_inflight:size(Inflight) of + 0 when PingPending =:= true -> send_message(?SN_PINGRESP_MSG(), State), goto_asleep_state(State#state{has_pending_pingresp = false}); - true when PingPending =:= false -> + 0 when PingPending =:= false -> goto_asleep_state(State); - false -> keep_state_and_data + _Size -> + keep_state_and_data end; awake(EventType, EventContent, State) -> @@ -502,7 +508,7 @@ handle_event({call, From}, Req, _StateName, State) -> handle_event(info, {datagram, SockPid, Data}, StateName, State = #state{sockpid = SockPid, channel = _Channel}) -> - ?LOG(debug, "RECV ~p", [Data]), + ?LOG(debug, "RECV ~0p", [Data]), Oct = iolist_size(Data), inc_counter(recv_oct, Oct), try emqx_sn_frame:parse(Data) of @@ -520,7 +526,7 @@ handle_event(info, {datagram, SockPid, Data}, StateName, handle_event(info, {deliver, _Topic, Msg}, asleep, State = #state{channel = Channel}) -> % section 6.14, Support of sleeping clients - ?LOG(debug, "enqueue downlink message in asleep state Msg=~p", [Msg]), + ?LOG(debug, "enqueue downlink message in asleep state Msg=~0p", [Msg]), Session = emqx_session:enqueue(Msg, emqx_channel:get_session(Channel)), {keep_state, State#state{channel = emqx_channel:set_session(Session, Channel)}}; @@ -576,7 +582,7 @@ handle_event(cast, {event, _Other}, _StateName, State = #state{channel = Channel {keep_state, State}; handle_event(EventType, EventContent, StateName, State) -> - ?LOG(error, "StateName: ~s, Unexpected Event: ~p", + ?LOG(error, "StateName: ~s, Unexpected Event: ~0p", [StateName, {EventType, EventContent}]), {keep_state, State}. @@ -991,8 +997,8 @@ do_puback(TopicId, MsgId, ReturnCode, StateName, {keep_state, State} end. -do_pubrec(PubRec, MsgId, State) -> - handle_incoming(mqttsn_to_mqtt(PubRec, MsgId), State). +do_pubrec(PubRec, MsgId, StateName, State) -> + handle_incoming(mqttsn_to_mqtt(PubRec, MsgId), StateName, State). proto_subscribe(TopicName, QoS, MsgId, TopicId, State) -> ?LOG(debug, "subscribe Topic=~p, MsgId=~p, TopicId=~p", @@ -1010,7 +1016,7 @@ proto_publish(TopicName, Data, Dup, QoS, Retain, MsgId, TopicId, State) -> Publish = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, dup = Dup, qos = QoS, retain = Retain}, variable = #mqtt_packet_publish{topic_name = TopicName, packet_id = MsgId}, payload = Data}, - ?LOG(debug, "[publish] Msg: ~p~n", [Publish]), + ?LOG(debug, "[publish] Msg: ~0p~n", [Publish]), handle_incoming(Publish, State). update_will_topic(undefined, #mqtt_sn_flags{qos = QoS, retain = Retain}, Topic) -> @@ -1048,9 +1054,10 @@ get_topic_id(Type, MsgId) -> handle_incoming(Packet, State) -> handle_incoming(Packet, unknown, State). -handle_incoming(?PUBACK_PACKET(_) = Packet, awake, State) -> +handle_incoming(#mqtt_packet{variable = #mqtt_packet_puback{}} = Packet, awake, State) -> Result = channel_handle_in(Packet, State), handle_return(Result, State, [try_goto_asleep]); + handle_incoming(Packet, _StName, State) -> Result = channel_handle_in(Packet, State), handle_return(Result, State). @@ -1068,7 +1075,7 @@ handle_outgoing(PubPkt = ?PUBLISH_PACKET(QoS, TopicName, PacketId, Payload), State = #state{clientid = ClientId, registry = Registry}) -> #mqtt_packet{header = #mqtt_packet_header{dup = Dup, retain = Retain}} = PubPkt, MsgId = message_id(PacketId), - ?LOG(debug, "Handle outgoing: ~p", [PubPkt]), + ?LOG(debug, "Handle outgoing: ~0p", [PubPkt]), (emqx_sn_registry:lookup_topic_id(Registry, ClientId, TopicName) == undefined) andalso (byte_size(TopicName) =/= 2) diff --git a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl index 5717539b1..b2999e442 100644 --- a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl @@ -1251,7 +1251,10 @@ t_asleep_test06_to_awake_qos2_dl_msg(_) -> UdpData = wrap_receive_response(Socket), MsgId_udp = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicId_tom, Payload1}, UdpData), send_pubrec_msg(Socket, MsgId_udp), + ?assertMatch(<<_:8, ?SN_PUBREL:8, _/binary>>, receive_response(Socket)), + send_pubcomp_msg(Socket, MsgId_udp), + %% verify the pingresp is received after receiving all the buffered qos2 msgs ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), gen_udp:close(Socket). diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 321d05bd4..229ffa5cb 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -48,6 +48,9 @@ , terminate/2 ]). +%% Export for emqx_sn +-export([do_deliver/2]). + %% Exports for CT -export([set_field/3]).