diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index 83a9bfcf6..49bee148e 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -24,8 +24,8 @@ File format: ### Bug fixes * List subscription topic (/api/v4/subscriptions), the result do not match with multiple conditions. - * SSL closed error bug fixed for redis client. +* Fix mqtt-sn client disconnected due to re-send a duplicated qos2 message * Improved resilience against autocluster partitioning during cluster startup. [#7876] diff --git a/apps/emqx_sn/src/emqx_sn.app.src b/apps/emqx_sn/src/emqx_sn.app.src index 319137fed..68e0340f8 100644 --- a/apps/emqx_sn/src/emqx_sn.app.src +++ b/apps/emqx_sn/src/emqx_sn.app.src @@ -1,6 +1,6 @@ {application, emqx_sn, [{description, "EMQ X MQTT-SN Plugin"}, - {vsn, "4.3.6"}, % strict semver, bump manually! + {vsn, "4.3.7"}, % strict semver, bump manually! {modules, []}, {registered, []}, {applications, [kernel,stdlib,esockd]}, diff --git a/apps/emqx_sn/src/emqx_sn.appup.src b/apps/emqx_sn/src/emqx_sn.appup.src index 269605afa..bab8a2b7a 100644 --- a/apps/emqx_sn/src/emqx_sn.appup.src +++ b/apps/emqx_sn/src/emqx_sn.appup.src @@ -1,6 +1,9 @@ %% -*- mode: erlang -*- {VSN, [ + {"4.3.6",[ + {load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} + ]}, {"4.3.5",[ {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, @@ -26,6 +29,9 @@ {<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]} ], [ + {"4.3.6",[ + {load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} + ]}, {"4.3.5",[ {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, diff --git a/apps/emqx_sn/src/emqx_sn_gateway.erl b/apps/emqx_sn/src/emqx_sn_gateway.erl index a1b502d26..22fe51b6e 100644 --- a/apps/emqx_sn/src/emqx_sn_gateway.erl +++ b/apps/emqx_sn/src/emqx_sn_gateway.erl @@ -831,13 +831,15 @@ mqtt2sn(?CONNACK_PACKET(0, _SessPresent), _State) -> mqtt2sn(?CONNACK_PACKET(_ReturnCode, _SessPresent), _State) -> ?SN_CONNACK_MSG(?SN_RC_CONGESTION); -mqtt2sn(?PUBREC_PACKET(MsgId), _State) -> +mqtt2sn(?PUBACK_PACKET(MsgId, _ReasonCode), _State) -> + TopicIdFinal = get_topic_id(puback, MsgId), + ?SN_PUBACK_MSG(TopicIdFinal, MsgId, ?SN_RC_ACCEPTED); + +mqtt2sn(?PUBREC_PACKET(MsgId, _ReturnCode), _State) -> ?SN_PUBREC_MSG(?SN_PUBREC, MsgId); - -mqtt2sn(?PUBREL_PACKET(MsgId), _State) -> +mqtt2sn(?PUBREL_PACKET(MsgId, _ReturnCode), _State) -> ?SN_PUBREC_MSG(?SN_PUBREL, MsgId); - -mqtt2sn(?PUBCOMP_PACKET(MsgId), _State) -> +mqtt2sn(?PUBCOMP_PACKET(MsgId, _ReturnCode), _State) -> ?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId); mqtt2sn(?UNSUBACK_PACKET(MsgId), _State)-> @@ -884,11 +886,7 @@ mqtt2sn(?SUBACK_PACKET(MsgId, ReturnCodes), _State)-> {?QOS_0, get_topic_id(suback, MsgId), ?SN_RC_NOT_SUPPORTED} end, Flags = #mqtt_sn_flags{qos = QoS}, - ?SN_SUBACK_MSG(Flags, TopicId, MsgId, NewReturnCode); - -mqtt2sn(?PUBACK_PACKET(MsgId, _ReasonCode), _State) -> - TopicIdFinal = get_topic_id(puback, MsgId), - ?SN_PUBACK_MSG(TopicIdFinal, MsgId, ?SN_RC_ACCEPTED). + ?SN_SUBACK_MSG(Flags, TopicId, MsgId, NewReturnCode). send_register(TopicName, TopicId, MsgId, State) -> send_message(?SN_REGISTER_MSG(TopicId, MsgId, TopicName), State). diff --git a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl index 3d509822f..4912b4fcf 100644 --- a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl @@ -36,7 +36,7 @@ -define(FLAG_RETAIN(X),X). -define(FLAG_SESSION(X),X). --define(LOG(Format, Args), ct:print("TEST: " ++ Format, Args)). +-define(LOG(Format, Args), ct:log("TEST: " ++ Format, Args)). -define(MAX_PRED_TOPIC_ID, 2). -define(PREDEF_TOPIC_ID1, 1). @@ -825,6 +825,42 @@ t_publish_qos2_case03(_) -> ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), gen_udp:close(Socket). +t_publish_qos2_re_sent(_) -> + Dup = 0, + QoS = 2, + Retain = 0, + Will = 0, + CleanSession = 0, + MsgId = 7, + TopicId0 = 0, + {ok, Socket} = gen_udp:open(0, [binary]), + send_connect_msg(Socket, <<"test">>), + ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), + + send_subscribe_msg_normal_topic(Socket, QoS, <<"/#">>, MsgId), + ?assertEqual(<<8, ?SN_SUBACK, ?FNU:1, QoS:2, ?FNU:5, TopicId0:16, MsgId:16, ?SN_RC_ACCEPTED>>, + receive_response(Socket)), + + Payload1 = <<20, 21, 22, 23>>, + send_publish_msg_short_topic(Socket, QoS, MsgId, <<"/a">>, Payload1), + ?assertEqual(<<4, ?SN_PUBREC, MsgId:16>>, receive_response(Socket)), + ?assertEqual(<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_SHORT_TOPIC :2, <<"/a">>/binary, 1:16, <<20, 21, 22, 23>>/binary>>, receive_response(Socket)), + + %% re-sent qos2 PUBLISH + send_publish_msg_short_topic(Socket, QoS, MsgId, <<"/a">>, Payload1), + %% still receive PUBREC normally + ?assertEqual(<<4, ?SN_PUBREC, MsgId:16>>, receive_response(Socket)), + send_pubrel_msg(Socket, MsgId), + ?assertEqual(<<4, ?SN_PUBCOMP, MsgId:16>>, receive_response(Socket)), + + timer:sleep(100), + + send_disconnect_msg(Socket, undefined), + %% note: receiving DISCONNECT packet here means that the qos2 is not duplicated + %% publish to broker + ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), + gen_udp:close(Socket). + t_delivery_qos1_register_invalid_topic_id(_) -> Dup = 0, QoS = 1,