diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index 3ae706edb..d03dbc782 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -33,6 +33,7 @@ File format: * Fix Server-KeepAlive wrongly applied on MQTT v3.0/v3.1 [#7085] * Fix Stomp client can not trigger `$event/client_connection` message [#7096] * Fix system memory false alarm at boot +* Fix the MQTT-SN message replay when the topic is not registered to the client [#6970] ## v4.3.12 ### Important changes diff --git a/apps/emqx_sn/src/emqx_sn.app.src b/apps/emqx_sn/src/emqx_sn.app.src index ef0d24bf9..319137fed 100644 --- a/apps/emqx_sn/src/emqx_sn.app.src +++ b/apps/emqx_sn/src/emqx_sn.app.src @@ -1,6 +1,6 @@ {application, emqx_sn, [{description, "EMQ X MQTT-SN Plugin"}, - {vsn, "4.3.5"}, % strict semver, bump manually! + {vsn, "4.3.6"}, % strict semver, bump manually! {modules, []}, {registered, []}, {applications, [kernel,stdlib,esockd]}, diff --git a/apps/emqx_sn/src/emqx_sn.appup.src b/apps/emqx_sn/src/emqx_sn.appup.src index 749a72956..6a4eb66d1 100644 --- a/apps/emqx_sn/src/emqx_sn.appup.src +++ b/apps/emqx_sn/src/emqx_sn.appup.src @@ -1,29 +1,25 @@ %% -*- mode: erlang -*- {VSN, [ - {"4.3.4",[ + {<<"4\\.3\\.[4-5]">>,[ + {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} ]}, - {"4.3.3",[ - {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, - {load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} - ]}, - {"4.3.2", [ + {<<"4.3.[2-3]">>,[ {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} ]}, {<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]} ], [ - {"4.3.4",[ + {<<"4\\.3\\.[4-5]">>,[ + {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} ]}, - {"4.3.3",[ - {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, - {load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} - ]}, - {"4.3.2", [ + {<<"4.3.[2-3]">>,[ {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} ]}, {<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]} diff --git a/apps/emqx_sn/src/emqx_sn_frame.erl b/apps/emqx_sn/src/emqx_sn_frame.erl index eed32803d..28a20956e 100644 --- a/apps/emqx_sn/src/emqx_sn_frame.erl +++ b/apps/emqx_sn/src/emqx_sn_frame.erl @@ -268,40 +268,81 @@ message_type(16#1d) -> message_type(Type) -> io_lib:format("Unknown Type ~p", [Type]). +format(?SN_CONNECT_MSG(Flags, ProtocolId, Duration, ClientId)) -> + #mqtt_sn_flags{ + will = Will, + clean_start = CleanStart} = Flags, + io_lib:format("SN_CONNECT(W~w, C~w, ProtocolId=~w, Duration=~w, " + "ClientId=~s)", + [bool(Will), bool(CleanStart), + ProtocolId, Duration, ClientId]); +format(?SN_CONNACK_MSG(ReturnCode)) -> + io_lib:format("SN_CONNACK(ReturnCode=~w)", [ReturnCode]); +format(?SN_WILLTOPICREQ_MSG()) -> + "SN_WILLTOPICREQ()"; +format(?SN_WILLTOPIC_MSG(Flags, Topic)) -> + #mqtt_sn_flags{ + qos = QoS, + retain = Retain} = Flags, + io_lib:format("SN_WILLTOPIC(Q~w, R~w, Topic=~s)", + [QoS, bool(Retain), Topic]); +format(?SN_WILLTOPIC_EMPTY_MSG) -> + "SN_WILLTOPIC(_)"; +format(?SN_WILLMSGREQ_MSG()) -> + "SN_WILLMSGREQ()"; +format(?SN_WILLMSG_MSG(Msg)) -> + io_lib:format("SN_WILLMSG_MSG(Msg=~p)", [Msg]); format(?SN_PUBLISH_MSG(Flags, TopicId, MsgId, Data)) -> - io_lib:format("mqtt_sn_message SN_PUBLISH, ~s, TopicId=~w, MsgId=~w, Payload=~w", - [format_flag(Flags), TopicId, MsgId, Data]); -format(?SN_PUBACK_MSG(Flags, MsgId, ReturnCode)) -> - io_lib:format("mqtt_sn_message SN_PUBACK, ~s, MsgId=~w, ReturnCode=~w", - [format_flag(Flags), MsgId, ReturnCode]); + #mqtt_sn_flags{ + dup = Dup, + qos = QoS, + retain = Retain, + topic_id_type = TopicIdType} = Flags, + io_lib:format("SN_PUBLISH(D~w, Q~w, R~w, TopicIdType=~w, TopicId=~w, " + "MsgId=~w, Payload=~p)", + [bool(Dup), QoS, bool(Retain), + TopicIdType, TopicId, MsgId, Data]); +format(?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode)) -> + io_lib:format("SN_PUBACK(TopicId=~w, MsgId=~w, ReturnCode=~w)", + [TopicId, MsgId, ReturnCode]); format(?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId)) -> - io_lib:format("mqtt_sn_message SN_PUBCOMP, MsgId=~w", [MsgId]); + io_lib:format("SN_PUBCOMP(MsgId=~w)", [MsgId]); format(?SN_PUBREC_MSG(?SN_PUBREC, MsgId)) -> - io_lib:format("mqtt_sn_message SN_PUBREC, MsgId=~w", [MsgId]); + io_lib:format("SN_PUBREC(MsgId=~w)", [MsgId]); format(?SN_PUBREC_MSG(?SN_PUBREL, MsgId)) -> - io_lib:format("mqtt_sn_message SN_PUBREL, MsgId=~w", [MsgId]); + io_lib:format("SN_PUBREL(MsgId=~w)", [MsgId]); format(?SN_SUBSCRIBE_MSG(Flags, Msgid, Topic)) -> - io_lib:format("mqtt_sn_message SN_SUBSCRIBE, ~s, MsgId=~w, TopicId=~w", - [format_flag(Flags), Msgid, Topic]); + #mqtt_sn_flags{ + dup = Dup, + qos = QoS, + topic_id_type = TopicIdType} = Flags, + io_lib:format("SN_SUBSCRIBE(D~w, Q~w, TopicIdType=~w, MsgId=~w, " + "TopicId=~w)", + [bool(Dup), QoS, TopicIdType, Msgid, Topic]); format(?SN_SUBACK_MSG(Flags, TopicId, MsgId, ReturnCode)) -> - io_lib:format("mqtt_sn_message SN_SUBACK, ~s, MsgId=~w, TopicId=~w, ReturnCode=~w", - [format_flag(Flags), MsgId, TopicId, ReturnCode]); + #mqtt_sn_flags{qos = QoS} = Flags, + io_lib:format("SN_SUBACK(GrantedQoS=~w, MsgId=~w, TopicId=~w, " + "ReturnCode=~w)", + [QoS, MsgId, TopicId, ReturnCode]); format(?SN_UNSUBSCRIBE_MSG(Flags, Msgid, Topic)) -> - io_lib:format("mqtt_sn_message SN_UNSUBSCRIBE, ~s, MsgId=~w, TopicId=~w", - [format_flag(Flags), Msgid, Topic]); + #mqtt_sn_flags{topic_id_type = TopicIdType} = Flags, + io_lib:format("SN_UNSUBSCRIBE(TopicIdType=~s, MsgId=~w, TopicId=~w)", + [TopicIdType, Msgid, Topic]); format(?SN_UNSUBACK_MSG(MsgId)) -> - io_lib:format("mqtt_sn_message SN_UNSUBACK, MsgId=~w", [MsgId]); + io_lib:format("SN_UNSUBACK(MsgId=~w)", [MsgId]); format(?SN_REGISTER_MSG(TopicId, MsgId, TopicName)) -> - io_lib:format("mqtt_sn_message SN_REGISTER, TopicId=~w, MsgId=~w, TopicName=~w", + io_lib:format("SN_REGISTER(TopicId=~w, MsgId=~w, TopicName=~s)", [TopicId, MsgId, TopicName]); format(?SN_REGACK_MSG(TopicId, MsgId, ReturnCode)) -> - io_lib:format("mqtt_sn_message SN_REGACK, TopicId=~w, MsgId=~w, ReturnCode=~w", + io_lib:format("SN_REGACK(TopicId=~w, MsgId=~w, ReturnCode=~w)", [TopicId, MsgId, ReturnCode]); +format(?SN_PINGREQ_MSG(ClientId)) -> + io_lib:format("SN_PINGREQ(ClientId=~s)", [ClientId]); +format(?SN_PINGRESP_MSG()) -> + "SN_PINGREQ()"; +format(?SN_DISCONNECT_MSG(Duration)) -> + io_lib:format("SN_DISCONNECT(Duration=~s)", [Duration]); + format(#mqtt_sn_message{type = Type, variable = Var}) -> - io_lib:format("mqtt_sn_message type=~s, Var=~w", [emqx_sn_frame:message_type(Type), Var]). - -format_flag(#mqtt_sn_flags{dup = Dup, qos = QoS, retain = Retain, will = Will, clean_start = CleanStart, topic_id_type = TopicType}) -> - io_lib:format("mqtt_sn_flags{dup=~p, qos=~p, retain=~p, will=~p, clean_session=~p, topic_id_type=~p}", - [Dup, QoS, Retain, Will, CleanStart, TopicType]); -format_flag(_Flag) -> "invalid flag". - + io_lib:format("mqtt_sn_message(type=~s, Var=~w)", + [emqx_sn_frame:message_type(Type), Var]). diff --git a/apps/emqx_sn/src/emqx_sn_gateway.erl b/apps/emqx_sn/src/emqx_sn_gateway.erl index 442aa1db8..265607229 100644 --- a/apps/emqx_sn/src/emqx_sn_gateway.erl +++ b/apps/emqx_sn/src/emqx_sn_gateway.erl @@ -94,6 +94,8 @@ idle_timeout :: integer(), enable_qos3 = false :: boolean(), has_pending_pingresp = false :: boolean(), + %% Store all qos0 messages for waiting REGACK + %% Note: QoS1/QoS2 messages will kept inflight queue pending_topic_ids = #{} :: pending_msgs() }). @@ -490,7 +492,7 @@ handle_event({call, From}, Req, _StateName, State) -> {reply, Reply, NState} -> gen_server:reply(From, Reply), {keep_state, NState}; - {stop, Reason, Reply, NState} -> + {shutdown, Reason, Reply, NState} -> State0 = case NState#state.sockstate of running -> send_message(?SN_DISCONNECT_MSG(undefined), NState); @@ -518,10 +520,9 @@ handle_event(info, {datagram, SockPid, Data}, StateName, end; handle_event(info, {deliver, _Topic, Msg}, asleep, - State = #state{channel = Channel, pending_topic_ids = Pendings}) -> + State = #state{channel = Channel}) -> % section 6.14, Support of sleeping clients - ?LOG(debug, "enqueue downlink message in asleep state, msg: ~0p, pending_topic_ids: ~0p", - [Msg, Pendings]), + ?LOG(debug, "enqueue downlink message in asleep state, msg: ~0p", [Msg]), Session = emqx_session:enqueue(emqx_channel:info(clientinfo, Channel), Msg, emqx_channel:get_session(Channel)), {keep_state, State#state{channel = emqx_channel:set_session(Session, Channel)}}; @@ -610,7 +611,7 @@ handle_call(_From, Req, State = #state{channel = Channel}) -> {reply, Reply, NChannel} -> {reply, Reply, State#state{channel = NChannel}}; {shutdown, Reason, Reply, NChannel} -> - stop(Reason, Reply, State#state{channel = NChannel}) + {shutdown, Reason, Reply, State#state{channel = NChannel}} end. handle_info({sock_closed, Reason} = Info, State = #state{channel = Channel}) -> @@ -723,11 +724,19 @@ mqtt2sn(?PUBCOMP_PACKET(MsgId), _State) -> mqtt2sn(?UNSUBACK_PACKET(MsgId), _State)-> ?SN_UNSUBACK_MSG(MsgId); -mqtt2sn(?PUBLISH_PACKET(QoS, Topic, PacketId, Payload), #state{channel = Channel}) -> - NewPacketId = if QoS =:= ?QOS_0 -> 0; +mqtt2sn( + #mqtt_packet{header = #mqtt_packet_header{ + type = ?PUBLISH, + qos = QoS, + %dup = Dup, + retain = Retain}, + variable = #mqtt_packet_publish{ + topic_name = Topic, + packet_id = PacketId}, + payload = Payload}, #state{clientid = ClientId}) -> + NPacketId = if QoS =:= ?QOS_0 -> 0; true -> PacketId end, - ClientId = emqx_channel:info(clientid, Channel), {TopicIdType, TopicContent} = case emqx_sn_registry:lookup_topic_id(ClientId, Topic) of {predef, PredefTopicId} -> {?SN_PREDEFINED_TOPIC, PredefTopicId}; @@ -737,8 +746,12 @@ mqtt2sn(?PUBLISH_PACKET(QoS, Topic, PacketId, Payload), #state{channel = Channe {?SN_SHORT_TOPIC, Topic} end, - Flags = #mqtt_sn_flags{qos = QoS, topic_id_type = TopicIdType}, - ?SN_PUBLISH_MSG(Flags, TopicContent, NewPacketId, Payload); + Flags = #mqtt_sn_flags{ + %dup = Dup, + qos = QoS, + retain = Retain, + topic_id_type = TopicIdType}, + ?SN_PUBLISH_MSG(Flags, TopicContent, NPacketId, Payload); mqtt2sn(?SUBACK_PACKET(MsgId, ReturnCodes), _State)-> % if success, suback is sent by handle_info({suback, MsgId, [GrantedQoS]}, ...) @@ -766,9 +779,10 @@ send_connack(State) -> send_message(Msg = #mqtt_sn_message{type = Type}, State = #state{sockpid = SockPid, peername = Peername}) -> - ?LOG(debug, "SEND ~s~n", [emqx_sn_frame:format(Msg)]), + ?LOG(info, "SEND ~s~n", [emqx_sn_frame:format(Msg)]), inc_outgoing_stats(Type), Data = emqx_sn_frame:serialize(Msg), + ?LOG(debug, "SEND ~0p", [Data]), ok = emqx_metrics:inc('bytes.sent', iolist_size(Data)), SockPid ! {datagram, Peername, Data}, State. @@ -793,13 +807,6 @@ stop(Reason, State) -> maybe_send_will_msg(Reason, State), {stop, {shutdown, Reason}, State}. -stop({shutdown, Reason}, Reply, State) -> - stop(Reason, Reply, State); -stop(Reason, Reply, State) -> - ?LOG(stop_log_level(Reason), "stop due to ~p", [Reason]), - maybe_send_will_msg(Reason, State), - {stop, {shutdown, Reason}, Reply, State}. - maybe_send_will_msg(normal, _State) -> ok; maybe_send_will_msg(_Reason, State) -> @@ -825,6 +832,9 @@ do_connect(ClientId, CleanStart, WillFlag, Duration, State) -> %% At any point in time a client may have only one QoS level 1 or 2 PUBLISH message %% outstanding, i.e. it has to wait for the termination of this PUBLISH message exchange %% before it could start a new level 1 or 2 transaction. + %% + %% FIXME: But we should have a re-try timer to re-send the inflight + %% qos1/qos2 message OnlyOneInflight = #{'Receive-Maximum' => 1}, ConnPkt = #mqtt_packet_connect{clientid = ClientId, clean_start = CleanStart, @@ -973,11 +983,16 @@ do_puback(TopicId, MsgId, ReturnCode, StateName, case emqx_sn_registry:lookup_topic(ClientId, TopicId) of undefined -> {keep_state, State}; TopicName -> - %%notice that this TopicName maybe normal or predefined, - %% involving the predefined topic name in register to enhance the gateway's robustness even inconsistent with MQTT-SN channels - {keep_state, send_register(TopicName, TopicId, MsgId, State)} + %% notice that this TopicName maybe normal or predefined, + %% involving the predefined topic name in register to + %% enhance the gateway's robustness even inconsistent + %% with MQTT-SN channel + {keep_state, + send_register(TopicName, TopicId, MsgId, State)} end; _ -> + %% XXX: We need to handle others error code + %% 'Rejection: congestion' ?LOG(error, "CAN NOT handle PUBACK ReturnCode=~p", [ReturnCode]), {keep_state, State} end. @@ -1050,7 +1065,7 @@ handle_incoming(Packet, _StName, State) -> channel_handle_in(Packet = ?PACKET(Type), #state{channel = Channel}) -> _ = inc_incoming_stats(Type), ok = emqx_metrics:inc_recv(Packet), - ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]), + ?LOG(debug, "Transed-RECV ~s", [emqx_packet:format(Packet)]), emqx_channel:handle_in(Packet, Channel). handle_outgoing(Packets, State) when is_list(Packets) -> @@ -1064,7 +1079,9 @@ handle_outgoing(PubPkt = ?PUBLISH_PACKET(_, TopicName, _, _), ClientId = emqx_channel:info(clientid, Channel), TopicId = emqx_sn_registry:lookup_topic_id(ClientId, TopicName), case (TopicId == undefined) andalso (byte_size(TopicName) =/= 2) of - true -> register_and_notify_client(PubPkt, State); + true -> + %% TODO: only one REGISTER inflight if qos=0?? + register_and_notify_client(PubPkt, State); false -> send_message(mqtt2sn(PubPkt, State), State) end; @@ -1077,13 +1094,40 @@ cache_no_reg_publish_message(Pendings, TopicId, PubPkt, State) -> Msgs = maps:get(pending_topic_ids, Pendings, []), Pendings#{TopicId => Msgs ++ [mqtt2sn(PubPkt, State)]}. -replay_no_reg_pending_publishes(TopicId, #state{pending_topic_ids = Pendings} = State0) -> - ?LOG(debug, "replay non-registered publish message for topic-id: ~p, pendings: ~0p", - [TopicId, Pendings]), +replay_no_reg_pending_publishes(TopicId, + State0 = #state{ + pending_topic_ids = Pendings}) -> + ?LOG(debug, "replay non-registered qos0 publish message for " + "topic-id: ~p, pendings: ~0p", [TopicId, Pendings]), State = lists:foldl(fun(Msg, State1) -> send_message(Msg, State1) end, State0, maps:get(TopicId, Pendings, [])), - State#state{pending_topic_ids = maps:remove(TopicId, Pendings)}. + + NState = State#state{pending_topic_ids = maps:remove(TopicId, Pendings)}, + case replay_inflight_messages(TopicId, State#state.channel) of + [] -> ok; + Outgoings -> + ?LOG(debug, "replay non-registered qos1/qos2 publish message " + "for topic-id: ~0p, messages: ~0p", + [TopicId, Outgoings]), + handle_outgoing(Outgoings, NState) + end. + +replay_inflight_messages(TopicId, Channel) -> + Inflight = emqx_session:info(inflight, emqx_channel:get_session(Channel)), + + case emqx_inflight:to_list(Inflight) of + [] -> []; + [{PktId, {Msg, _Ts}}] -> %% Fixed inflight size 1 + ClientId = emqx_channel:info(clientid, Channel), + ReplayTopic = emqx_sn_registry:lookup_topic(ClientId, TopicId), + case ReplayTopic =:= emqx_message:topic(Msg) of + false -> []; + true -> + NMsg = emqx_message:set_flag(dup, true, Msg), + [emqx_message:to_packet(PktId, NMsg)] + end + end. register_and_notify_client(?PUBLISH_PACKET(QoS, TopicName, PacketId, Payload) = PubPkt, State = #state{pending_topic_ids = Pendings, channel = Channel}) -> @@ -1091,10 +1135,17 @@ register_and_notify_client(?PUBLISH_PACKET(QoS, TopicName, PacketId, Payload) = #mqtt_packet{header = #mqtt_packet_header{dup = Dup, retain = Retain}} = PubPkt, ClientId = emqx_channel:info(clientid, Channel), TopicId = emqx_sn_registry:register_topic(ClientId, TopicName), - ?LOG(debug, "Register TopicId=~p, TopicName=~p, Payload=~p, Dup=~p, QoS=~p, " - "Retain=~p, MsgId=~p", [TopicId, TopicName, Payload, Dup, QoS, Retain, MsgId]), - NewPendings = cache_no_reg_publish_message(Pendings, TopicId, PubPkt, State), - send_register(TopicName, TopicId, MsgId, State#state{pending_topic_ids = NewPendings}). + ?LOG(debug, "Register TopicId=~p, TopicName=~p, Payload=~p, Dup=~p, " + "QoS=~p,Retain=~p, MsgId=~p", + [TopicId, TopicName, Payload, Dup, QoS, Retain, MsgId]), + NPendings = case QoS == ?QOS_0 of + true -> + cache_no_reg_publish_message( + Pendings, TopicId, PubPkt, State); + _ -> Pendings + end, + send_register(TopicName, TopicId, MsgId, + State#state{pending_topic_ids = NPendings}). message_id(undefined) -> rand:uniform(16#FFFF); diff --git a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl index 1371c5123..9ff519fb5 100644 --- a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl @@ -819,6 +819,151 @@ t_publish_qos2_case03(_) -> ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), gen_udp:close(Socket). +t_delivery_qos1_register_invalid_topic_id(_) -> + Dup = 0, + QoS = 1, + Retain = 0, + Will = 0, + CleanSession = 0, + MsgId = 1, + TopicId = ?MAX_PRED_TOPIC_ID + 1, + {ok, Socket} = gen_udp:open(0, [binary]), + send_connect_msg(Socket, <<"test">>), + ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), + + send_subscribe_msg_normal_topic(Socket, QoS, <<"ab">>, MsgId), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, + ?SN_NORMAL_TOPIC:2, TopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>, + receive_response(Socket)), + + Payload = <<"test-registration-inconsistent">>, + _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"ab">>, Payload)), + + ?assertEqual( + <<(7 + byte_size(Payload)), ?SN_PUBLISH, + Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId:16, MsgId:16, Payload/binary>>, receive_response(Socket)), + %% acked with ?SN_RC_INVALID_TOPIC_ID to + send_puback_msg(Socket, TopicId, MsgId, ?SN_RC_INVALID_TOPIC_ID), + ?assertEqual( + {TopicId, MsgId}, + check_register_msg_on_udp(<<"ab">>, receive_response(Socket))), + send_regack_msg(Socket, TopicId, MsgId + 1), + + %% receive the replay message + ?assertEqual( + <<(7 + byte_size(Payload)), ?SN_PUBLISH, + Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId:16, (MsgId):16, Payload/binary>>, receive_response(Socket)), + + send_disconnect_msg(Socket, undefined), + ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), + gen_udp:close(Socket). + +t_delivery_takeover_and_re_register(_) -> + MsgId = 1, + {ok, Socket} = gen_udp:open(0, [binary]), + send_connect_msg(Socket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(Socket)), + + send_subscribe_msg_normal_topic(Socket, ?QOS_1, <<"topic-a">>, MsgId+1), + <<_, ?SN_SUBACK, 2#00100000, + TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), + + send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2), + <<_, ?SN_SUBACK, 2#01000000, + TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), + + _ = emqx:publish( + emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"test-a">>)), + _ = emqx:publish( + emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"test-b">>)), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgId1:16, "test-a">> = receive_response(Socket), + send_puback_msg(Socket, TopicIdA, MsgId1, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdB:16, MsgId2:16, "test-b">> = receive_response(Socket), + send_puback_msg(Socket, TopicIdB, MsgId2, ?SN_RC_ACCEPTED), + + send_disconnect_msg(Socket, undefined), + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), + gen_udp:close(Socket), + + %% offline messages will be queued into the MQTT-SN session + _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m1">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m3">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m1">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m2">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m3">>)), + + {ok, NSocket} = gen_udp:open(0, [binary]), + send_connect_msg(NSocket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(NSocket)), + + %% qos1 + + %% received the resume messages + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgIdA0:16, "m1">> = receive_response(NSocket), + %% only one qos1/qos2 inflight + ?assertEqual(udp_receive_timeout, receive_response(NSocket)), + send_puback_msg(NSocket, TopicIdA, MsgIdA0, ?SN_RC_INVALID_TOPIC_ID), + %% recv register + <<_, ?SN_REGISTER, + TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket), + send_regack_msg(NSocket, TopicIdA, RegMsgIdA), + %% received the replay messages + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgIdA1:16, "m1">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgIdA2:16, "m2">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdA, MsgIdA2, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgIdA3:16, "m3">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdA, MsgIdA3, ?SN_RC_ACCEPTED), + + %% qos2 + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdB:16, MsgIdB0:16, "m1">> = receive_response(NSocket), + %% only one qos1/qos2 inflight + ?assertEqual(udp_receive_timeout, receive_response(NSocket)), + send_puback_msg(NSocket, TopicIdB, MsgIdB0, ?SN_RC_INVALID_TOPIC_ID), + %% recv register + <<_, ?SN_REGISTER, + TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket), + send_regack_msg(NSocket, TopicIdB, RegMsgIdB), + %% received the replay messages + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdB:16, MsgIdB1:16, "m1">> = receive_response(NSocket), + send_pubrec_msg(NSocket, MsgIdB1), + <<_, ?SN_PUBREL, MsgIdB1:16>> = receive_response(NSocket), + send_pubcomp_msg(NSocket, MsgIdB1), + + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdB:16, MsgIdB2:16, "m2">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdB, MsgIdB2, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdB:16, MsgIdB3:16, "m3">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdB, MsgIdB3, ?SN_RC_ACCEPTED), + + %% no more messages + ?assertEqual(udp_receive_timeout, receive_response(NSocket)), + + send_disconnect_msg(NSocket, undefined), + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket)), + gen_udp:close(NSocket). + t_will_case01(_) -> QoS = 1, Duration = 1, @@ -1591,13 +1736,16 @@ send_searchgw_msg(Socket) -> ok = gen_udp:send(Socket, ?HOST, ?PORT, <>). send_connect_msg(Socket, ClientId) -> + send_connect_msg(Socket, ClientId, 1). + +send_connect_msg(Socket, ClientId, CleanSession) when CleanSession == 0; + CleanSession == 1 -> Length = 6 + byte_size(ClientId), MsgType = ?SN_CONNECT, Dup = 0, QoS = 0, Retain = 0, Will = 0, - CleanSession = 1, TopicIdType = 0, ProtocolId = 1, Duration = 10, @@ -1713,9 +1861,12 @@ send_publish_msg_short_topic(Socket, QoS, MsgId, TopicName, Data) -> ok = gen_udp:send(Socket, ?HOST, ?PORT, PublishPacket). send_puback_msg(Socket, TopicId, MsgId) -> + send_puback_msg(Socket, TopicId, MsgId, ?SN_RC_ACCEPTED). + +send_puback_msg(Socket, TopicId, MsgId, Rc) -> Length = 7, MsgType = ?SN_PUBACK, - PubAckPacket = <>, + PubAckPacket = <>, ?LOG("send_puback_msg TopicId=~p, MsgId=~p", [TopicId, MsgId]), ok = gen_udp:send(Socket, ?HOST, ?PORT, PubAckPacket).