Merge pull request #7960 from HJianBo/fix-mqttsn-qos2
fix(mqttsn): fix mqtt-sn client disconnected due to re-send a duplicated qos2 message
This commit is contained in:
commit
7c82b84293
|
@ -24,8 +24,8 @@ File format:
|
||||||
|
|
||||||
### Bug fixes
|
### Bug fixes
|
||||||
* List subscription topic (/api/v4/subscriptions), the result do not match with multiple conditions.
|
* List subscription topic (/api/v4/subscriptions), the result do not match with multiple conditions.
|
||||||
|
|
||||||
* SSL closed error bug fixed for redis client.
|
* SSL closed error bug fixed for redis client.
|
||||||
|
* Fix mqtt-sn client disconnected due to re-send a duplicated qos2 message
|
||||||
|
|
||||||
* Improved resilience against autocluster partitioning during cluster
|
* Improved resilience against autocluster partitioning during cluster
|
||||||
startup. [#7876]
|
startup. [#7876]
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_sn,
|
{application, emqx_sn,
|
||||||
[{description, "EMQ X MQTT-SN Plugin"},
|
[{description, "EMQ X MQTT-SN Plugin"},
|
||||||
{vsn, "4.3.6"}, % strict semver, bump manually!
|
{vsn, "4.3.7"}, % strict semver, bump manually!
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [kernel,stdlib,esockd]},
|
{applications, [kernel,stdlib,esockd]},
|
||||||
|
|
|
@ -1,6 +1,9 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
{VSN,
|
{VSN,
|
||||||
[
|
[
|
||||||
|
{"4.3.6",[
|
||||||
|
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
|
||||||
|
]},
|
||||||
{"4.3.5",[
|
{"4.3.5",[
|
||||||
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
|
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_sn_app,brutal_purge,soft_purge,[]},
|
{load_module,emqx_sn_app,brutal_purge,soft_purge,[]},
|
||||||
|
@ -26,6 +29,9 @@
|
||||||
{<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]}
|
{<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]}
|
||||||
],
|
],
|
||||||
[
|
[
|
||||||
|
{"4.3.6",[
|
||||||
|
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
|
||||||
|
]},
|
||||||
{"4.3.5",[
|
{"4.3.5",[
|
||||||
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
|
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_sn_app,brutal_purge,soft_purge,[]},
|
{load_module,emqx_sn_app,brutal_purge,soft_purge,[]},
|
||||||
|
|
|
@ -831,13 +831,15 @@ mqtt2sn(?CONNACK_PACKET(0, _SessPresent), _State) ->
|
||||||
mqtt2sn(?CONNACK_PACKET(_ReturnCode, _SessPresent), _State) ->
|
mqtt2sn(?CONNACK_PACKET(_ReturnCode, _SessPresent), _State) ->
|
||||||
?SN_CONNACK_MSG(?SN_RC_CONGESTION);
|
?SN_CONNACK_MSG(?SN_RC_CONGESTION);
|
||||||
|
|
||||||
mqtt2sn(?PUBREC_PACKET(MsgId), _State) ->
|
mqtt2sn(?PUBACK_PACKET(MsgId, _ReasonCode), _State) ->
|
||||||
|
TopicIdFinal = get_topic_id(puback, MsgId),
|
||||||
|
?SN_PUBACK_MSG(TopicIdFinal, MsgId, ?SN_RC_ACCEPTED);
|
||||||
|
|
||||||
|
mqtt2sn(?PUBREC_PACKET(MsgId, _ReturnCode), _State) ->
|
||||||
?SN_PUBREC_MSG(?SN_PUBREC, MsgId);
|
?SN_PUBREC_MSG(?SN_PUBREC, MsgId);
|
||||||
|
mqtt2sn(?PUBREL_PACKET(MsgId, _ReturnCode), _State) ->
|
||||||
mqtt2sn(?PUBREL_PACKET(MsgId), _State) ->
|
|
||||||
?SN_PUBREC_MSG(?SN_PUBREL, MsgId);
|
?SN_PUBREC_MSG(?SN_PUBREL, MsgId);
|
||||||
|
mqtt2sn(?PUBCOMP_PACKET(MsgId, _ReturnCode), _State) ->
|
||||||
mqtt2sn(?PUBCOMP_PACKET(MsgId), _State) ->
|
|
||||||
?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId);
|
?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId);
|
||||||
|
|
||||||
mqtt2sn(?UNSUBACK_PACKET(MsgId), _State)->
|
mqtt2sn(?UNSUBACK_PACKET(MsgId), _State)->
|
||||||
|
@ -884,11 +886,7 @@ mqtt2sn(?SUBACK_PACKET(MsgId, ReturnCodes), _State)->
|
||||||
{?QOS_0, get_topic_id(suback, MsgId), ?SN_RC_NOT_SUPPORTED}
|
{?QOS_0, get_topic_id(suback, MsgId), ?SN_RC_NOT_SUPPORTED}
|
||||||
end,
|
end,
|
||||||
Flags = #mqtt_sn_flags{qos = QoS},
|
Flags = #mqtt_sn_flags{qos = QoS},
|
||||||
?SN_SUBACK_MSG(Flags, TopicId, MsgId, NewReturnCode);
|
?SN_SUBACK_MSG(Flags, TopicId, MsgId, NewReturnCode).
|
||||||
|
|
||||||
mqtt2sn(?PUBACK_PACKET(MsgId, _ReasonCode), _State) ->
|
|
||||||
TopicIdFinal = get_topic_id(puback, MsgId),
|
|
||||||
?SN_PUBACK_MSG(TopicIdFinal, MsgId, ?SN_RC_ACCEPTED).
|
|
||||||
|
|
||||||
send_register(TopicName, TopicId, MsgId, State) ->
|
send_register(TopicName, TopicId, MsgId, State) ->
|
||||||
send_message(?SN_REGISTER_MSG(TopicId, MsgId, TopicName), State).
|
send_message(?SN_REGISTER_MSG(TopicId, MsgId, TopicName), State).
|
||||||
|
|
|
@ -36,7 +36,7 @@
|
||||||
-define(FLAG_RETAIN(X),X).
|
-define(FLAG_RETAIN(X),X).
|
||||||
-define(FLAG_SESSION(X),X).
|
-define(FLAG_SESSION(X),X).
|
||||||
|
|
||||||
-define(LOG(Format, Args), ct:print("TEST: " ++ Format, Args)).
|
-define(LOG(Format, Args), ct:log("TEST: " ++ Format, Args)).
|
||||||
|
|
||||||
-define(MAX_PRED_TOPIC_ID, 2).
|
-define(MAX_PRED_TOPIC_ID, 2).
|
||||||
-define(PREDEF_TOPIC_ID1, 1).
|
-define(PREDEF_TOPIC_ID1, 1).
|
||||||
|
@ -825,6 +825,42 @@ t_publish_qos2_case03(_) ->
|
||||||
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
||||||
gen_udp:close(Socket).
|
gen_udp:close(Socket).
|
||||||
|
|
||||||
|
t_publish_qos2_re_sent(_) ->
|
||||||
|
Dup = 0,
|
||||||
|
QoS = 2,
|
||||||
|
Retain = 0,
|
||||||
|
Will = 0,
|
||||||
|
CleanSession = 0,
|
||||||
|
MsgId = 7,
|
||||||
|
TopicId0 = 0,
|
||||||
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
||||||
|
send_connect_msg(Socket, <<"test">>),
|
||||||
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
||||||
|
|
||||||
|
send_subscribe_msg_normal_topic(Socket, QoS, <<"/#">>, MsgId),
|
||||||
|
?assertEqual(<<8, ?SN_SUBACK, ?FNU:1, QoS:2, ?FNU:5, TopicId0:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
||||||
|
receive_response(Socket)),
|
||||||
|
|
||||||
|
Payload1 = <<20, 21, 22, 23>>,
|
||||||
|
send_publish_msg_short_topic(Socket, QoS, MsgId, <<"/a">>, Payload1),
|
||||||
|
?assertEqual(<<4, ?SN_PUBREC, MsgId:16>>, receive_response(Socket)),
|
||||||
|
?assertEqual(<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_SHORT_TOPIC :2, <<"/a">>/binary, 1:16, <<20, 21, 22, 23>>/binary>>, receive_response(Socket)),
|
||||||
|
|
||||||
|
%% re-sent qos2 PUBLISH
|
||||||
|
send_publish_msg_short_topic(Socket, QoS, MsgId, <<"/a">>, Payload1),
|
||||||
|
%% still receive PUBREC normally
|
||||||
|
?assertEqual(<<4, ?SN_PUBREC, MsgId:16>>, receive_response(Socket)),
|
||||||
|
send_pubrel_msg(Socket, MsgId),
|
||||||
|
?assertEqual(<<4, ?SN_PUBCOMP, MsgId:16>>, receive_response(Socket)),
|
||||||
|
|
||||||
|
timer:sleep(100),
|
||||||
|
|
||||||
|
send_disconnect_msg(Socket, undefined),
|
||||||
|
%% note: receiving DISCONNECT packet here means that the qos2 is not duplicated
|
||||||
|
%% publish to broker
|
||||||
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
||||||
|
gen_udp:close(Socket).
|
||||||
|
|
||||||
t_delivery_qos1_register_invalid_topic_id(_) ->
|
t_delivery_qos1_register_invalid_topic_id(_) ->
|
||||||
Dup = 0,
|
Dup = 0,
|
||||||
QoS = 1,
|
QoS = 1,
|
||||||
|
|
Loading…
Reference in New Issue