fix(mqttsn): fix mqtt-sn client disconnected due to re-send a duplicated qos2 message

This commit is contained in:
JianBo He 2022-05-16 13:51:47 +08:00
parent 61014e5088
commit 7357ee5918
5 changed files with 53 additions and 13 deletions

View File

@ -23,8 +23,8 @@ File format:
### Bug fixes
* List subscription topic (/api/v4/subscriptions), the result do not match with multiple conditions.
* SSL closed error bug fixed for redis client.
* Fix mqtt-sn client disconnected due to re-send a duplicated qos2 message
* Improved resilience against autocluster partitioning during cluster
startup. [#7876]

View File

@ -1,6 +1,6 @@
{application, emqx_sn,
[{description, "EMQ X MQTT-SN Plugin"},
{vsn, "4.3.6"}, % strict semver, bump manually!
{vsn, "4.3.7"}, % strict semver, bump manually!
{modules, []},
{registered, []},
{applications, [kernel,stdlib,esockd]},

View File

@ -1,6 +1,9 @@
%% -*- mode: erlang -*-
{VSN,
[
{"4.3.6",[
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
]},
{"4.3.5",[
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_app,brutal_purge,soft_purge,[]},
@ -26,6 +29,9 @@
{<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]}
],
[
{"4.3.6",[
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
]},
{"4.3.5",[
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_app,brutal_purge,soft_purge,[]},

View File

@ -831,13 +831,15 @@ mqtt2sn(?CONNACK_PACKET(0, _SessPresent), _State) ->
mqtt2sn(?CONNACK_PACKET(_ReturnCode, _SessPresent), _State) ->
?SN_CONNACK_MSG(?SN_RC_CONGESTION);
mqtt2sn(?PUBREC_PACKET(MsgId), _State) ->
mqtt2sn(?PUBACK_PACKET(MsgId, _ReasonCode), _State) ->
TopicIdFinal = get_topic_id(puback, MsgId),
?SN_PUBACK_MSG(TopicIdFinal, MsgId, ?SN_RC_ACCEPTED);
mqtt2sn(?PUBREC_PACKET(MsgId, _ReturnCode), _State) ->
?SN_PUBREC_MSG(?SN_PUBREC, MsgId);
mqtt2sn(?PUBREL_PACKET(MsgId), _State) ->
mqtt2sn(?PUBREL_PACKET(MsgId, _ReturnCode), _State) ->
?SN_PUBREC_MSG(?SN_PUBREL, MsgId);
mqtt2sn(?PUBCOMP_PACKET(MsgId), _State) ->
mqtt2sn(?PUBCOMP_PACKET(MsgId, _ReturnCode), _State) ->
?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId);
mqtt2sn(?UNSUBACK_PACKET(MsgId), _State)->
@ -884,11 +886,7 @@ mqtt2sn(?SUBACK_PACKET(MsgId, ReturnCodes), _State)->
{?QOS_0, get_topic_id(suback, MsgId), ?SN_RC_NOT_SUPPORTED}
end,
Flags = #mqtt_sn_flags{qos = QoS},
?SN_SUBACK_MSG(Flags, TopicId, MsgId, NewReturnCode);
mqtt2sn(?PUBACK_PACKET(MsgId, _ReasonCode), _State) ->
TopicIdFinal = get_topic_id(puback, MsgId),
?SN_PUBACK_MSG(TopicIdFinal, MsgId, ?SN_RC_ACCEPTED).
?SN_SUBACK_MSG(Flags, TopicId, MsgId, NewReturnCode).
send_register(TopicName, TopicId, MsgId, State) ->
send_message(?SN_REGISTER_MSG(TopicId, MsgId, TopicName), State).

View File

@ -36,7 +36,7 @@
-define(FLAG_RETAIN(X),X).
-define(FLAG_SESSION(X),X).
-define(LOG(Format, Args), ct:print("TEST: " ++ Format, Args)).
-define(LOG(Format, Args), ct:log("TEST: " ++ Format, Args)).
-define(MAX_PRED_TOPIC_ID, 2).
-define(PREDEF_TOPIC_ID1, 1).
@ -825,6 +825,42 @@ t_publish_qos2_case03(_) ->
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket).
t_publish_qos2_re_sent(_) ->
Dup = 0,
QoS = 2,
Retain = 0,
Will = 0,
CleanSession = 0,
MsgId = 7,
TopicId0 = 0,
{ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"test">>),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
send_subscribe_msg_normal_topic(Socket, QoS, <<"/#">>, MsgId),
?assertEqual(<<8, ?SN_SUBACK, ?FNU:1, QoS:2, ?FNU:5, TopicId0:16, MsgId:16, ?SN_RC_ACCEPTED>>,
receive_response(Socket)),
Payload1 = <<20, 21, 22, 23>>,
send_publish_msg_short_topic(Socket, QoS, MsgId, <<"/a">>, Payload1),
?assertEqual(<<4, ?SN_PUBREC, MsgId:16>>, receive_response(Socket)),
?assertEqual(<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_SHORT_TOPIC :2, <<"/a">>/binary, 1:16, <<20, 21, 22, 23>>/binary>>, receive_response(Socket)),
%% re-sent qos2 PUBLISH
send_publish_msg_short_topic(Socket, QoS, MsgId, <<"/a">>, Payload1),
%% still receive PUBREC normally
?assertEqual(<<4, ?SN_PUBREC, MsgId:16>>, receive_response(Socket)),
send_pubrel_msg(Socket, MsgId),
?assertEqual(<<4, ?SN_PUBCOMP, MsgId:16>>, receive_response(Socket)),
timer:sleep(100),
send_disconnect_msg(Socket, undefined),
%% note: receiving DISCONNECT packet here means that the qos2 is not duplicated
%% publish to broker
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket).
t_delivery_qos1_register_invalid_topic_id(_) ->
Dup = 0,
QoS = 1,