fix(mqtt_sn): send pingresp until all pubacks received

This commit is contained in:
Shawn 2021-04-09 18:55:59 +08:00
parent e3a5f65c88
commit 327b0c0995
3 changed files with 36 additions and 23 deletions

View File

@ -324,7 +324,7 @@ connected(cast, {incoming, ?SN_PUBACK_MSG(TopicId, MsgId, RC)}, State) ->
connected(cast, {incoming, ?SN_PUBREC_MSG(PubRec, MsgId)}, State) connected(cast, {incoming, ?SN_PUBREC_MSG(PubRec, MsgId)}, State)
when PubRec == ?SN_PUBREC; PubRec == ?SN_PUBREL; PubRec == ?SN_PUBCOMP -> when PubRec == ?SN_PUBREC; PubRec == ?SN_PUBREL; PubRec == ?SN_PUBCOMP ->
do_pubrec(PubRec, MsgId, State); do_pubrec(PubRec, MsgId, connected, State);
connected(cast, {incoming, ?SN_SUBSCRIBE_MSG(Flags, MsgId, TopicId)}, State) -> connected(cast, {incoming, ?SN_SUBSCRIBE_MSG(Flags, MsgId, TopicId)}, State) ->
#mqtt_sn_flags{qos = QoS, topic_id_type = TopicIdType} = Flags, #mqtt_sn_flags{qos = QoS, topic_id_type = TopicIdType} = Flags,
@ -412,19 +412,19 @@ asleep(cast, {incoming, ?SN_PINGREQ_MSG(ClientIdPing)},
case ClientIdPing of case ClientIdPing of
ClientId -> ClientId ->
inc_ping_counter(), inc_ping_counter(),
case emqx_session:dequeue(emqx_channel:get_session(Channel)) of case emqx_session:replay(emqx_channel:get_session(Channel)) of
{ok, Session0} -> {ok, [], Session0} ->
send_message(?SN_PINGRESP_MSG(), State), send_message(?SN_PINGRESP_MSG(), State),
{keep_state, State#state{ {keep_state, State#state{
channel = emqx_channel:set_session(Session0, Channel)}}; channel = emqx_channel:set_session(Session0, Channel)}};
{ok, Delivers, Session0} -> {ok, Publishes, Session0} ->
Events = [emqx_message:to_packet(PckId, Msg) || {PckId, Msg} <- Delivers] {Packets, Channel1} = emqx_channel:do_deliver(Publishes,
++ [try_goto_asleep], emqx_channel:set_session(Session0, Channel)),
{next_state, awake, State#state{ {next_state, awake,
channel = emqx_channel:set_session(Session0, Channel), State#state{channel = Channel1, has_pending_pingresp = true},
has_pending_pingresp = true}, outgoing_events(Events)} outgoing_events(Packets ++ [try_goto_asleep])}
end; end;
_Other -> _Other ->
{next_state, asleep, State} {next_state, asleep, State}
end; end;
@ -433,7 +433,7 @@ asleep(cast, {incoming, ?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode)}, State) ->
asleep(cast, {incoming, ?SN_PUBREC_MSG(PubRec, MsgId)}, State) asleep(cast, {incoming, ?SN_PUBREC_MSG(PubRec, MsgId)}, State)
when PubRec == ?SN_PUBREC; PubRec == ?SN_PUBREL; PubRec == ?SN_PUBCOMP -> when PubRec == ?SN_PUBREC; PubRec == ?SN_PUBREL; PubRec == ?SN_PUBCOMP ->
do_pubrec(PubRec, MsgId, State); do_pubrec(PubRec, MsgId, asleep, State);
% NOTE: what about following scenario: % NOTE: what about following scenario:
% 1) client go to sleep % 1) client go to sleep
@ -471,15 +471,21 @@ awake(cast, {outgoing, Packet}, State) ->
awake(cast, {incoming, ?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode)}, State) -> awake(cast, {incoming, ?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode)}, State) ->
do_puback(TopicId, MsgId, ReturnCode, awake, State); do_puback(TopicId, MsgId, ReturnCode, awake, State);
awake(cast, {incoming, ?SN_PUBREC_MSG(PubRec, MsgId)}, State)
when PubRec == ?SN_PUBREC; PubRec == ?SN_PUBREL; PubRec == ?SN_PUBCOMP ->
do_pubrec(PubRec, MsgId, awake, State);
awake(cast, try_goto_asleep, State=#state{channel = Channel, awake(cast, try_goto_asleep, State=#state{channel = Channel,
has_pending_pingresp = PingPending}) -> has_pending_pingresp = PingPending}) ->
case emqx_mqueue:is_empty(emqx_session:info(mqueue, emqx_channel:get_session(Channel))) of Inflight = emqx_session:info(inflight, emqx_channel:get_session(Channel)),
true when PingPending =:= true -> case emqx_inflight:size(Inflight) of
0 when PingPending =:= true ->
send_message(?SN_PINGRESP_MSG(), State), send_message(?SN_PINGRESP_MSG(), State),
goto_asleep_state(State#state{has_pending_pingresp = false}); goto_asleep_state(State#state{has_pending_pingresp = false});
true when PingPending =:= false -> 0 when PingPending =:= false ->
goto_asleep_state(State); goto_asleep_state(State);
false -> keep_state_and_data _Size ->
keep_state_and_data
end; end;
awake(EventType, EventContent, State) -> awake(EventType, EventContent, State) ->
@ -502,7 +508,7 @@ handle_event({call, From}, Req, _StateName, State) ->
handle_event(info, {datagram, SockPid, Data}, StateName, handle_event(info, {datagram, SockPid, Data}, StateName,
State = #state{sockpid = SockPid, channel = _Channel}) -> State = #state{sockpid = SockPid, channel = _Channel}) ->
?LOG(debug, "RECV ~p", [Data]), ?LOG(debug, "RECV ~0p", [Data]),
Oct = iolist_size(Data), Oct = iolist_size(Data),
inc_counter(recv_oct, Oct), inc_counter(recv_oct, Oct),
try emqx_sn_frame:parse(Data) of try emqx_sn_frame:parse(Data) of
@ -520,7 +526,7 @@ handle_event(info, {datagram, SockPid, Data}, StateName,
handle_event(info, {deliver, _Topic, Msg}, asleep, handle_event(info, {deliver, _Topic, Msg}, asleep,
State = #state{channel = Channel}) -> State = #state{channel = Channel}) ->
% section 6.14, Support of sleeping clients % section 6.14, Support of sleeping clients
?LOG(debug, "enqueue downlink message in asleep state Msg=~p", [Msg]), ?LOG(debug, "enqueue downlink message in asleep state Msg=~0p", [Msg]),
Session = emqx_session:enqueue(Msg, emqx_channel:get_session(Channel)), Session = emqx_session:enqueue(Msg, emqx_channel:get_session(Channel)),
{keep_state, State#state{channel = emqx_channel:set_session(Session, Channel)}}; {keep_state, State#state{channel = emqx_channel:set_session(Session, Channel)}};
@ -576,7 +582,7 @@ handle_event(cast, {event, _Other}, _StateName, State = #state{channel = Channel
{keep_state, State}; {keep_state, State};
handle_event(EventType, EventContent, StateName, State) -> handle_event(EventType, EventContent, StateName, State) ->
?LOG(error, "StateName: ~s, Unexpected Event: ~p", ?LOG(error, "StateName: ~s, Unexpected Event: ~0p",
[StateName, {EventType, EventContent}]), [StateName, {EventType, EventContent}]),
{keep_state, State}. {keep_state, State}.
@ -991,8 +997,8 @@ do_puback(TopicId, MsgId, ReturnCode, StateName,
{keep_state, State} {keep_state, State}
end. end.
do_pubrec(PubRec, MsgId, State) -> do_pubrec(PubRec, MsgId, StateName, State) ->
handle_incoming(mqttsn_to_mqtt(PubRec, MsgId), State). handle_incoming(mqttsn_to_mqtt(PubRec, MsgId), StateName, State).
proto_subscribe(TopicName, QoS, MsgId, TopicId, State) -> proto_subscribe(TopicName, QoS, MsgId, TopicId, State) ->
?LOG(debug, "subscribe Topic=~p, MsgId=~p, TopicId=~p", ?LOG(debug, "subscribe Topic=~p, MsgId=~p, TopicId=~p",
@ -1010,7 +1016,7 @@ proto_publish(TopicName, Data, Dup, QoS, Retain, MsgId, TopicId, State) ->
Publish = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, dup = Dup, qos = QoS, retain = Retain}, Publish = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, dup = Dup, qos = QoS, retain = Retain},
variable = #mqtt_packet_publish{topic_name = TopicName, packet_id = MsgId}, variable = #mqtt_packet_publish{topic_name = TopicName, packet_id = MsgId},
payload = Data}, payload = Data},
?LOG(debug, "[publish] Msg: ~p~n", [Publish]), ?LOG(debug, "[publish] Msg: ~0p~n", [Publish]),
handle_incoming(Publish, State). handle_incoming(Publish, State).
update_will_topic(undefined, #mqtt_sn_flags{qos = QoS, retain = Retain}, Topic) -> update_will_topic(undefined, #mqtt_sn_flags{qos = QoS, retain = Retain}, Topic) ->
@ -1048,9 +1054,10 @@ get_topic_id(Type, MsgId) ->
handle_incoming(Packet, State) -> handle_incoming(Packet, State) ->
handle_incoming(Packet, unknown, State). handle_incoming(Packet, unknown, State).
handle_incoming(?PUBACK_PACKET(_) = Packet, awake, State) -> handle_incoming(#mqtt_packet{variable = #mqtt_packet_puback{}} = Packet, awake, State) ->
Result = channel_handle_in(Packet, State), Result = channel_handle_in(Packet, State),
handle_return(Result, State, [try_goto_asleep]); handle_return(Result, State, [try_goto_asleep]);
handle_incoming(Packet, _StName, State) -> handle_incoming(Packet, _StName, State) ->
Result = channel_handle_in(Packet, State), Result = channel_handle_in(Packet, State),
handle_return(Result, State). handle_return(Result, State).
@ -1068,7 +1075,7 @@ handle_outgoing(PubPkt = ?PUBLISH_PACKET(QoS, TopicName, PacketId, Payload),
State = #state{clientid = ClientId, registry = Registry}) -> State = #state{clientid = ClientId, registry = Registry}) ->
#mqtt_packet{header = #mqtt_packet_header{dup = Dup, retain = Retain}} = PubPkt, #mqtt_packet{header = #mqtt_packet_header{dup = Dup, retain = Retain}} = PubPkt,
MsgId = message_id(PacketId), MsgId = message_id(PacketId),
?LOG(debug, "Handle outgoing: ~p", [PubPkt]), ?LOG(debug, "Handle outgoing: ~0p", [PubPkt]),
(emqx_sn_registry:lookup_topic_id(Registry, ClientId, TopicName) == undefined) (emqx_sn_registry:lookup_topic_id(Registry, ClientId, TopicName) == undefined)
andalso (byte_size(TopicName) =/= 2) andalso (byte_size(TopicName) =/= 2)

View File

@ -1251,7 +1251,10 @@ t_asleep_test06_to_awake_qos2_dl_msg(_) ->
UdpData = wrap_receive_response(Socket), UdpData = wrap_receive_response(Socket),
MsgId_udp = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicId_tom, Payload1}, UdpData), MsgId_udp = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicId_tom, Payload1}, UdpData),
send_pubrec_msg(Socket, MsgId_udp), send_pubrec_msg(Socket, MsgId_udp),
?assertMatch(<<_:8, ?SN_PUBREL:8, _/binary>>, receive_response(Socket)),
send_pubcomp_msg(Socket, MsgId_udp),
%% verify the pingresp is received after receiving all the buffered qos2 msgs
?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
gen_udp:close(Socket). gen_udp:close(Socket).

View File

@ -48,6 +48,9 @@
, terminate/2 , terminate/2
]). ]).
%% Export for emqx_sn
-export([do_deliver/2]).
%% Exports for CT %% Exports for CT
-export([set_field/3]). -export([set_field/3]).