Merge pull request #6970 from HJianBo/fix-no-register-msg-replaying
Fix the MQTT-SN message replay when the topic is not registered to the client
This commit is contained in:
commit
b2093409b6
|
@ -33,6 +33,7 @@ File format:
|
||||||
* Fix Server-KeepAlive wrongly applied on MQTT v3.0/v3.1 [#7085]
|
* Fix Server-KeepAlive wrongly applied on MQTT v3.0/v3.1 [#7085]
|
||||||
* Fix Stomp client can not trigger `$event/client_connection` message [#7096]
|
* Fix Stomp client can not trigger `$event/client_connection` message [#7096]
|
||||||
* Fix system memory false alarm at boot
|
* Fix system memory false alarm at boot
|
||||||
|
* Fix the MQTT-SN message replay when the topic is not registered to the client [#6970]
|
||||||
|
|
||||||
## v4.3.12
|
## v4.3.12
|
||||||
### Important changes
|
### Important changes
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_sn,
|
{application, emqx_sn,
|
||||||
[{description, "EMQ X MQTT-SN Plugin"},
|
[{description, "EMQ X MQTT-SN Plugin"},
|
||||||
{vsn, "4.3.5"}, % strict semver, bump manually!
|
{vsn, "4.3.6"}, % strict semver, bump manually!
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [kernel,stdlib,esockd]},
|
{applications, [kernel,stdlib,esockd]},
|
||||||
|
|
|
@ -1,29 +1,25 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
{VSN,
|
{VSN,
|
||||||
[
|
[
|
||||||
{"4.3.4",[
|
{<<"4\\.3\\.[4-5]">>,[
|
||||||
|
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
|
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
|
||||||
]},
|
]},
|
||||||
{"4.3.3",[
|
{<<"4.3.[2-3]">>,[
|
||||||
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]},
|
|
||||||
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
|
|
||||||
]},
|
|
||||||
{"4.3.2", [
|
|
||||||
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]},
|
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
|
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
|
||||||
]},
|
]},
|
||||||
{<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]}
|
{<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]}
|
||||||
],
|
],
|
||||||
[
|
[
|
||||||
{"4.3.4",[
|
{<<"4\\.3\\.[4-5]">>,[
|
||||||
|
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
|
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
|
||||||
]},
|
]},
|
||||||
{"4.3.3",[
|
{<<"4.3.[2-3]">>,[
|
||||||
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]},
|
|
||||||
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
|
|
||||||
]},
|
|
||||||
{"4.3.2", [
|
|
||||||
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]},
|
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
|
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
|
||||||
]},
|
]},
|
||||||
{<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]}
|
{<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]}
|
||||||
|
|
|
@ -268,40 +268,81 @@ message_type(16#1d) ->
|
||||||
message_type(Type) ->
|
message_type(Type) ->
|
||||||
io_lib:format("Unknown Type ~p", [Type]).
|
io_lib:format("Unknown Type ~p", [Type]).
|
||||||
|
|
||||||
|
format(?SN_CONNECT_MSG(Flags, ProtocolId, Duration, ClientId)) ->
|
||||||
|
#mqtt_sn_flags{
|
||||||
|
will = Will,
|
||||||
|
clean_start = CleanStart} = Flags,
|
||||||
|
io_lib:format("SN_CONNECT(W~w, C~w, ProtocolId=~w, Duration=~w, "
|
||||||
|
"ClientId=~s)",
|
||||||
|
[bool(Will), bool(CleanStart),
|
||||||
|
ProtocolId, Duration, ClientId]);
|
||||||
|
format(?SN_CONNACK_MSG(ReturnCode)) ->
|
||||||
|
io_lib:format("SN_CONNACK(ReturnCode=~w)", [ReturnCode]);
|
||||||
|
format(?SN_WILLTOPICREQ_MSG()) ->
|
||||||
|
"SN_WILLTOPICREQ()";
|
||||||
|
format(?SN_WILLTOPIC_MSG(Flags, Topic)) ->
|
||||||
|
#mqtt_sn_flags{
|
||||||
|
qos = QoS,
|
||||||
|
retain = Retain} = Flags,
|
||||||
|
io_lib:format("SN_WILLTOPIC(Q~w, R~w, Topic=~s)",
|
||||||
|
[QoS, bool(Retain), Topic]);
|
||||||
|
format(?SN_WILLTOPIC_EMPTY_MSG) ->
|
||||||
|
"SN_WILLTOPIC(_)";
|
||||||
|
format(?SN_WILLMSGREQ_MSG()) ->
|
||||||
|
"SN_WILLMSGREQ()";
|
||||||
|
format(?SN_WILLMSG_MSG(Msg)) ->
|
||||||
|
io_lib:format("SN_WILLMSG_MSG(Msg=~p)", [Msg]);
|
||||||
format(?SN_PUBLISH_MSG(Flags, TopicId, MsgId, Data)) ->
|
format(?SN_PUBLISH_MSG(Flags, TopicId, MsgId, Data)) ->
|
||||||
io_lib:format("mqtt_sn_message SN_PUBLISH, ~s, TopicId=~w, MsgId=~w, Payload=~w",
|
#mqtt_sn_flags{
|
||||||
[format_flag(Flags), TopicId, MsgId, Data]);
|
dup = Dup,
|
||||||
format(?SN_PUBACK_MSG(Flags, MsgId, ReturnCode)) ->
|
qos = QoS,
|
||||||
io_lib:format("mqtt_sn_message SN_PUBACK, ~s, MsgId=~w, ReturnCode=~w",
|
retain = Retain,
|
||||||
[format_flag(Flags), MsgId, ReturnCode]);
|
topic_id_type = TopicIdType} = Flags,
|
||||||
|
io_lib:format("SN_PUBLISH(D~w, Q~w, R~w, TopicIdType=~w, TopicId=~w, "
|
||||||
|
"MsgId=~w, Payload=~p)",
|
||||||
|
[bool(Dup), QoS, bool(Retain),
|
||||||
|
TopicIdType, TopicId, MsgId, Data]);
|
||||||
|
format(?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode)) ->
|
||||||
|
io_lib:format("SN_PUBACK(TopicId=~w, MsgId=~w, ReturnCode=~w)",
|
||||||
|
[TopicId, MsgId, ReturnCode]);
|
||||||
format(?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId)) ->
|
format(?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId)) ->
|
||||||
io_lib:format("mqtt_sn_message SN_PUBCOMP, MsgId=~w", [MsgId]);
|
io_lib:format("SN_PUBCOMP(MsgId=~w)", [MsgId]);
|
||||||
format(?SN_PUBREC_MSG(?SN_PUBREC, MsgId)) ->
|
format(?SN_PUBREC_MSG(?SN_PUBREC, MsgId)) ->
|
||||||
io_lib:format("mqtt_sn_message SN_PUBREC, MsgId=~w", [MsgId]);
|
io_lib:format("SN_PUBREC(MsgId=~w)", [MsgId]);
|
||||||
format(?SN_PUBREC_MSG(?SN_PUBREL, MsgId)) ->
|
format(?SN_PUBREC_MSG(?SN_PUBREL, MsgId)) ->
|
||||||
io_lib:format("mqtt_sn_message SN_PUBREL, MsgId=~w", [MsgId]);
|
io_lib:format("SN_PUBREL(MsgId=~w)", [MsgId]);
|
||||||
format(?SN_SUBSCRIBE_MSG(Flags, Msgid, Topic)) ->
|
format(?SN_SUBSCRIBE_MSG(Flags, Msgid, Topic)) ->
|
||||||
io_lib:format("mqtt_sn_message SN_SUBSCRIBE, ~s, MsgId=~w, TopicId=~w",
|
#mqtt_sn_flags{
|
||||||
[format_flag(Flags), Msgid, Topic]);
|
dup = Dup,
|
||||||
|
qos = QoS,
|
||||||
|
topic_id_type = TopicIdType} = Flags,
|
||||||
|
io_lib:format("SN_SUBSCRIBE(D~w, Q~w, TopicIdType=~w, MsgId=~w, "
|
||||||
|
"TopicId=~w)",
|
||||||
|
[bool(Dup), QoS, TopicIdType, Msgid, Topic]);
|
||||||
format(?SN_SUBACK_MSG(Flags, TopicId, MsgId, ReturnCode)) ->
|
format(?SN_SUBACK_MSG(Flags, TopicId, MsgId, ReturnCode)) ->
|
||||||
io_lib:format("mqtt_sn_message SN_SUBACK, ~s, MsgId=~w, TopicId=~w, ReturnCode=~w",
|
#mqtt_sn_flags{qos = QoS} = Flags,
|
||||||
[format_flag(Flags), MsgId, TopicId, ReturnCode]);
|
io_lib:format("SN_SUBACK(GrantedQoS=~w, MsgId=~w, TopicId=~w, "
|
||||||
|
"ReturnCode=~w)",
|
||||||
|
[QoS, MsgId, TopicId, ReturnCode]);
|
||||||
format(?SN_UNSUBSCRIBE_MSG(Flags, Msgid, Topic)) ->
|
format(?SN_UNSUBSCRIBE_MSG(Flags, Msgid, Topic)) ->
|
||||||
io_lib:format("mqtt_sn_message SN_UNSUBSCRIBE, ~s, MsgId=~w, TopicId=~w",
|
#mqtt_sn_flags{topic_id_type = TopicIdType} = Flags,
|
||||||
[format_flag(Flags), Msgid, Topic]);
|
io_lib:format("SN_UNSUBSCRIBE(TopicIdType=~s, MsgId=~w, TopicId=~w)",
|
||||||
|
[TopicIdType, Msgid, Topic]);
|
||||||
format(?SN_UNSUBACK_MSG(MsgId)) ->
|
format(?SN_UNSUBACK_MSG(MsgId)) ->
|
||||||
io_lib:format("mqtt_sn_message SN_UNSUBACK, MsgId=~w", [MsgId]);
|
io_lib:format("SN_UNSUBACK(MsgId=~w)", [MsgId]);
|
||||||
format(?SN_REGISTER_MSG(TopicId, MsgId, TopicName)) ->
|
format(?SN_REGISTER_MSG(TopicId, MsgId, TopicName)) ->
|
||||||
io_lib:format("mqtt_sn_message SN_REGISTER, TopicId=~w, MsgId=~w, TopicName=~w",
|
io_lib:format("SN_REGISTER(TopicId=~w, MsgId=~w, TopicName=~s)",
|
||||||
[TopicId, MsgId, TopicName]);
|
[TopicId, MsgId, TopicName]);
|
||||||
format(?SN_REGACK_MSG(TopicId, MsgId, ReturnCode)) ->
|
format(?SN_REGACK_MSG(TopicId, MsgId, ReturnCode)) ->
|
||||||
io_lib:format("mqtt_sn_message SN_REGACK, TopicId=~w, MsgId=~w, ReturnCode=~w",
|
io_lib:format("SN_REGACK(TopicId=~w, MsgId=~w, ReturnCode=~w)",
|
||||||
[TopicId, MsgId, ReturnCode]);
|
[TopicId, MsgId, ReturnCode]);
|
||||||
|
format(?SN_PINGREQ_MSG(ClientId)) ->
|
||||||
|
io_lib:format("SN_PINGREQ(ClientId=~s)", [ClientId]);
|
||||||
|
format(?SN_PINGRESP_MSG()) ->
|
||||||
|
"SN_PINGREQ()";
|
||||||
|
format(?SN_DISCONNECT_MSG(Duration)) ->
|
||||||
|
io_lib:format("SN_DISCONNECT(Duration=~s)", [Duration]);
|
||||||
|
|
||||||
format(#mqtt_sn_message{type = Type, variable = Var}) ->
|
format(#mqtt_sn_message{type = Type, variable = Var}) ->
|
||||||
io_lib:format("mqtt_sn_message type=~s, Var=~w", [emqx_sn_frame:message_type(Type), Var]).
|
io_lib:format("mqtt_sn_message(type=~s, Var=~w)",
|
||||||
|
[emqx_sn_frame:message_type(Type), Var]).
|
||||||
format_flag(#mqtt_sn_flags{dup = Dup, qos = QoS, retain = Retain, will = Will, clean_start = CleanStart, topic_id_type = TopicType}) ->
|
|
||||||
io_lib:format("mqtt_sn_flags{dup=~p, qos=~p, retain=~p, will=~p, clean_session=~p, topic_id_type=~p}",
|
|
||||||
[Dup, QoS, Retain, Will, CleanStart, TopicType]);
|
|
||||||
format_flag(_Flag) -> "invalid flag".
|
|
||||||
|
|
||||||
|
|
|
@ -94,6 +94,8 @@
|
||||||
idle_timeout :: integer(),
|
idle_timeout :: integer(),
|
||||||
enable_qos3 = false :: boolean(),
|
enable_qos3 = false :: boolean(),
|
||||||
has_pending_pingresp = false :: boolean(),
|
has_pending_pingresp = false :: boolean(),
|
||||||
|
%% Store all qos0 messages for waiting REGACK
|
||||||
|
%% Note: QoS1/QoS2 messages will kept inflight queue
|
||||||
pending_topic_ids = #{} :: pending_msgs()
|
pending_topic_ids = #{} :: pending_msgs()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
@ -490,7 +492,7 @@ handle_event({call, From}, Req, _StateName, State) ->
|
||||||
{reply, Reply, NState} ->
|
{reply, Reply, NState} ->
|
||||||
gen_server:reply(From, Reply),
|
gen_server:reply(From, Reply),
|
||||||
{keep_state, NState};
|
{keep_state, NState};
|
||||||
{stop, Reason, Reply, NState} ->
|
{shutdown, Reason, Reply, NState} ->
|
||||||
State0 = case NState#state.sockstate of
|
State0 = case NState#state.sockstate of
|
||||||
running ->
|
running ->
|
||||||
send_message(?SN_DISCONNECT_MSG(undefined), NState);
|
send_message(?SN_DISCONNECT_MSG(undefined), NState);
|
||||||
|
@ -518,10 +520,9 @@ handle_event(info, {datagram, SockPid, Data}, StateName,
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_event(info, {deliver, _Topic, Msg}, asleep,
|
handle_event(info, {deliver, _Topic, Msg}, asleep,
|
||||||
State = #state{channel = Channel, pending_topic_ids = Pendings}) ->
|
State = #state{channel = Channel}) ->
|
||||||
% section 6.14, Support of sleeping clients
|
% section 6.14, Support of sleeping clients
|
||||||
?LOG(debug, "enqueue downlink message in asleep state, msg: ~0p, pending_topic_ids: ~0p",
|
?LOG(debug, "enqueue downlink message in asleep state, msg: ~0p", [Msg]),
|
||||||
[Msg, Pendings]),
|
|
||||||
Session = emqx_session:enqueue(emqx_channel:info(clientinfo, Channel),
|
Session = emqx_session:enqueue(emqx_channel:info(clientinfo, Channel),
|
||||||
Msg, emqx_channel:get_session(Channel)),
|
Msg, emqx_channel:get_session(Channel)),
|
||||||
{keep_state, State#state{channel = emqx_channel:set_session(Session, Channel)}};
|
{keep_state, State#state{channel = emqx_channel:set_session(Session, Channel)}};
|
||||||
|
@ -610,7 +611,7 @@ handle_call(_From, Req, State = #state{channel = Channel}) ->
|
||||||
{reply, Reply, NChannel} ->
|
{reply, Reply, NChannel} ->
|
||||||
{reply, Reply, State#state{channel = NChannel}};
|
{reply, Reply, State#state{channel = NChannel}};
|
||||||
{shutdown, Reason, Reply, NChannel} ->
|
{shutdown, Reason, Reply, NChannel} ->
|
||||||
stop(Reason, Reply, State#state{channel = NChannel})
|
{shutdown, Reason, Reply, State#state{channel = NChannel}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
handle_info({sock_closed, Reason} = Info, State = #state{channel = Channel}) ->
|
handle_info({sock_closed, Reason} = Info, State = #state{channel = Channel}) ->
|
||||||
|
@ -723,11 +724,19 @@ mqtt2sn(?PUBCOMP_PACKET(MsgId), _State) ->
|
||||||
mqtt2sn(?UNSUBACK_PACKET(MsgId), _State)->
|
mqtt2sn(?UNSUBACK_PACKET(MsgId), _State)->
|
||||||
?SN_UNSUBACK_MSG(MsgId);
|
?SN_UNSUBACK_MSG(MsgId);
|
||||||
|
|
||||||
mqtt2sn(?PUBLISH_PACKET(QoS, Topic, PacketId, Payload), #state{channel = Channel}) ->
|
mqtt2sn(
|
||||||
NewPacketId = if QoS =:= ?QOS_0 -> 0;
|
#mqtt_packet{header = #mqtt_packet_header{
|
||||||
|
type = ?PUBLISH,
|
||||||
|
qos = QoS,
|
||||||
|
%dup = Dup,
|
||||||
|
retain = Retain},
|
||||||
|
variable = #mqtt_packet_publish{
|
||||||
|
topic_name = Topic,
|
||||||
|
packet_id = PacketId},
|
||||||
|
payload = Payload}, #state{clientid = ClientId}) ->
|
||||||
|
NPacketId = if QoS =:= ?QOS_0 -> 0;
|
||||||
true -> PacketId
|
true -> PacketId
|
||||||
end,
|
end,
|
||||||
ClientId = emqx_channel:info(clientid, Channel),
|
|
||||||
{TopicIdType, TopicContent} = case emqx_sn_registry:lookup_topic_id(ClientId, Topic) of
|
{TopicIdType, TopicContent} = case emqx_sn_registry:lookup_topic_id(ClientId, Topic) of
|
||||||
{predef, PredefTopicId} ->
|
{predef, PredefTopicId} ->
|
||||||
{?SN_PREDEFINED_TOPIC, PredefTopicId};
|
{?SN_PREDEFINED_TOPIC, PredefTopicId};
|
||||||
|
@ -737,8 +746,12 @@ mqtt2sn(?PUBLISH_PACKET(QoS, Topic, PacketId, Payload), #state{channel = Channe
|
||||||
{?SN_SHORT_TOPIC, Topic}
|
{?SN_SHORT_TOPIC, Topic}
|
||||||
end,
|
end,
|
||||||
|
|
||||||
Flags = #mqtt_sn_flags{qos = QoS, topic_id_type = TopicIdType},
|
Flags = #mqtt_sn_flags{
|
||||||
?SN_PUBLISH_MSG(Flags, TopicContent, NewPacketId, Payload);
|
%dup = Dup,
|
||||||
|
qos = QoS,
|
||||||
|
retain = Retain,
|
||||||
|
topic_id_type = TopicIdType},
|
||||||
|
?SN_PUBLISH_MSG(Flags, TopicContent, NPacketId, Payload);
|
||||||
|
|
||||||
mqtt2sn(?SUBACK_PACKET(MsgId, ReturnCodes), _State)->
|
mqtt2sn(?SUBACK_PACKET(MsgId, ReturnCodes), _State)->
|
||||||
% if success, suback is sent by handle_info({suback, MsgId, [GrantedQoS]}, ...)
|
% if success, suback is sent by handle_info({suback, MsgId, [GrantedQoS]}, ...)
|
||||||
|
@ -766,9 +779,10 @@ send_connack(State) ->
|
||||||
|
|
||||||
send_message(Msg = #mqtt_sn_message{type = Type},
|
send_message(Msg = #mqtt_sn_message{type = Type},
|
||||||
State = #state{sockpid = SockPid, peername = Peername}) ->
|
State = #state{sockpid = SockPid, peername = Peername}) ->
|
||||||
?LOG(debug, "SEND ~s~n", [emqx_sn_frame:format(Msg)]),
|
?LOG(info, "SEND ~s~n", [emqx_sn_frame:format(Msg)]),
|
||||||
inc_outgoing_stats(Type),
|
inc_outgoing_stats(Type),
|
||||||
Data = emqx_sn_frame:serialize(Msg),
|
Data = emqx_sn_frame:serialize(Msg),
|
||||||
|
?LOG(debug, "SEND ~0p", [Data]),
|
||||||
ok = emqx_metrics:inc('bytes.sent', iolist_size(Data)),
|
ok = emqx_metrics:inc('bytes.sent', iolist_size(Data)),
|
||||||
SockPid ! {datagram, Peername, Data},
|
SockPid ! {datagram, Peername, Data},
|
||||||
State.
|
State.
|
||||||
|
@ -793,13 +807,6 @@ stop(Reason, State) ->
|
||||||
maybe_send_will_msg(Reason, State),
|
maybe_send_will_msg(Reason, State),
|
||||||
{stop, {shutdown, Reason}, State}.
|
{stop, {shutdown, Reason}, State}.
|
||||||
|
|
||||||
stop({shutdown, Reason}, Reply, State) ->
|
|
||||||
stop(Reason, Reply, State);
|
|
||||||
stop(Reason, Reply, State) ->
|
|
||||||
?LOG(stop_log_level(Reason), "stop due to ~p", [Reason]),
|
|
||||||
maybe_send_will_msg(Reason, State),
|
|
||||||
{stop, {shutdown, Reason}, Reply, State}.
|
|
||||||
|
|
||||||
maybe_send_will_msg(normal, _State) ->
|
maybe_send_will_msg(normal, _State) ->
|
||||||
ok;
|
ok;
|
||||||
maybe_send_will_msg(_Reason, State) ->
|
maybe_send_will_msg(_Reason, State) ->
|
||||||
|
@ -825,6 +832,9 @@ do_connect(ClientId, CleanStart, WillFlag, Duration, State) ->
|
||||||
%% At any point in time a client may have only one QoS level 1 or 2 PUBLISH message
|
%% At any point in time a client may have only one QoS level 1 or 2 PUBLISH message
|
||||||
%% outstanding, i.e. it has to wait for the termination of this PUBLISH message exchange
|
%% outstanding, i.e. it has to wait for the termination of this PUBLISH message exchange
|
||||||
%% before it could start a new level 1 or 2 transaction.
|
%% before it could start a new level 1 or 2 transaction.
|
||||||
|
%%
|
||||||
|
%% FIXME: But we should have a re-try timer to re-send the inflight
|
||||||
|
%% qos1/qos2 message
|
||||||
OnlyOneInflight = #{'Receive-Maximum' => 1},
|
OnlyOneInflight = #{'Receive-Maximum' => 1},
|
||||||
ConnPkt = #mqtt_packet_connect{clientid = ClientId,
|
ConnPkt = #mqtt_packet_connect{clientid = ClientId,
|
||||||
clean_start = CleanStart,
|
clean_start = CleanStart,
|
||||||
|
@ -973,11 +983,16 @@ do_puback(TopicId, MsgId, ReturnCode, StateName,
|
||||||
case emqx_sn_registry:lookup_topic(ClientId, TopicId) of
|
case emqx_sn_registry:lookup_topic(ClientId, TopicId) of
|
||||||
undefined -> {keep_state, State};
|
undefined -> {keep_state, State};
|
||||||
TopicName ->
|
TopicName ->
|
||||||
%%notice that this TopicName maybe normal or predefined,
|
%% notice that this TopicName maybe normal or predefined,
|
||||||
%% involving the predefined topic name in register to enhance the gateway's robustness even inconsistent with MQTT-SN channels
|
%% involving the predefined topic name in register to
|
||||||
{keep_state, send_register(TopicName, TopicId, MsgId, State)}
|
%% enhance the gateway's robustness even inconsistent
|
||||||
|
%% with MQTT-SN channel
|
||||||
|
{keep_state,
|
||||||
|
send_register(TopicName, TopicId, MsgId, State)}
|
||||||
end;
|
end;
|
||||||
_ ->
|
_ ->
|
||||||
|
%% XXX: We need to handle others error code
|
||||||
|
%% 'Rejection: congestion'
|
||||||
?LOG(error, "CAN NOT handle PUBACK ReturnCode=~p", [ReturnCode]),
|
?LOG(error, "CAN NOT handle PUBACK ReturnCode=~p", [ReturnCode]),
|
||||||
{keep_state, State}
|
{keep_state, State}
|
||||||
end.
|
end.
|
||||||
|
@ -1050,7 +1065,7 @@ handle_incoming(Packet, _StName, State) ->
|
||||||
channel_handle_in(Packet = ?PACKET(Type), #state{channel = Channel}) ->
|
channel_handle_in(Packet = ?PACKET(Type), #state{channel = Channel}) ->
|
||||||
_ = inc_incoming_stats(Type),
|
_ = inc_incoming_stats(Type),
|
||||||
ok = emqx_metrics:inc_recv(Packet),
|
ok = emqx_metrics:inc_recv(Packet),
|
||||||
?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]),
|
?LOG(debug, "Transed-RECV ~s", [emqx_packet:format(Packet)]),
|
||||||
emqx_channel:handle_in(Packet, Channel).
|
emqx_channel:handle_in(Packet, Channel).
|
||||||
|
|
||||||
handle_outgoing(Packets, State) when is_list(Packets) ->
|
handle_outgoing(Packets, State) when is_list(Packets) ->
|
||||||
|
@ -1064,7 +1079,9 @@ handle_outgoing(PubPkt = ?PUBLISH_PACKET(_, TopicName, _, _),
|
||||||
ClientId = emqx_channel:info(clientid, Channel),
|
ClientId = emqx_channel:info(clientid, Channel),
|
||||||
TopicId = emqx_sn_registry:lookup_topic_id(ClientId, TopicName),
|
TopicId = emqx_sn_registry:lookup_topic_id(ClientId, TopicName),
|
||||||
case (TopicId == undefined) andalso (byte_size(TopicName) =/= 2) of
|
case (TopicId == undefined) andalso (byte_size(TopicName) =/= 2) of
|
||||||
true -> register_and_notify_client(PubPkt, State);
|
true ->
|
||||||
|
%% TODO: only one REGISTER inflight if qos=0??
|
||||||
|
register_and_notify_client(PubPkt, State);
|
||||||
false -> send_message(mqtt2sn(PubPkt, State), State)
|
false -> send_message(mqtt2sn(PubPkt, State), State)
|
||||||
end;
|
end;
|
||||||
|
|
||||||
|
@ -1077,13 +1094,40 @@ cache_no_reg_publish_message(Pendings, TopicId, PubPkt, State) ->
|
||||||
Msgs = maps:get(pending_topic_ids, Pendings, []),
|
Msgs = maps:get(pending_topic_ids, Pendings, []),
|
||||||
Pendings#{TopicId => Msgs ++ [mqtt2sn(PubPkt, State)]}.
|
Pendings#{TopicId => Msgs ++ [mqtt2sn(PubPkt, State)]}.
|
||||||
|
|
||||||
replay_no_reg_pending_publishes(TopicId, #state{pending_topic_ids = Pendings} = State0) ->
|
replay_no_reg_pending_publishes(TopicId,
|
||||||
?LOG(debug, "replay non-registered publish message for topic-id: ~p, pendings: ~0p",
|
State0 = #state{
|
||||||
[TopicId, Pendings]),
|
pending_topic_ids = Pendings}) ->
|
||||||
|
?LOG(debug, "replay non-registered qos0 publish message for "
|
||||||
|
"topic-id: ~p, pendings: ~0p", [TopicId, Pendings]),
|
||||||
State = lists:foldl(fun(Msg, State1) ->
|
State = lists:foldl(fun(Msg, State1) ->
|
||||||
send_message(Msg, State1)
|
send_message(Msg, State1)
|
||||||
end, State0, maps:get(TopicId, Pendings, [])),
|
end, State0, maps:get(TopicId, Pendings, [])),
|
||||||
State#state{pending_topic_ids = maps:remove(TopicId, Pendings)}.
|
|
||||||
|
NState = State#state{pending_topic_ids = maps:remove(TopicId, Pendings)},
|
||||||
|
case replay_inflight_messages(TopicId, State#state.channel) of
|
||||||
|
[] -> ok;
|
||||||
|
Outgoings ->
|
||||||
|
?LOG(debug, "replay non-registered qos1/qos2 publish message "
|
||||||
|
"for topic-id: ~0p, messages: ~0p",
|
||||||
|
[TopicId, Outgoings]),
|
||||||
|
handle_outgoing(Outgoings, NState)
|
||||||
|
end.
|
||||||
|
|
||||||
|
replay_inflight_messages(TopicId, Channel) ->
|
||||||
|
Inflight = emqx_session:info(inflight, emqx_channel:get_session(Channel)),
|
||||||
|
|
||||||
|
case emqx_inflight:to_list(Inflight) of
|
||||||
|
[] -> [];
|
||||||
|
[{PktId, {Msg, _Ts}}] -> %% Fixed inflight size 1
|
||||||
|
ClientId = emqx_channel:info(clientid, Channel),
|
||||||
|
ReplayTopic = emqx_sn_registry:lookup_topic(ClientId, TopicId),
|
||||||
|
case ReplayTopic =:= emqx_message:topic(Msg) of
|
||||||
|
false -> [];
|
||||||
|
true ->
|
||||||
|
NMsg = emqx_message:set_flag(dup, true, Msg),
|
||||||
|
[emqx_message:to_packet(PktId, NMsg)]
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
register_and_notify_client(?PUBLISH_PACKET(QoS, TopicName, PacketId, Payload) = PubPkt,
|
register_and_notify_client(?PUBLISH_PACKET(QoS, TopicName, PacketId, Payload) = PubPkt,
|
||||||
State = #state{pending_topic_ids = Pendings, channel = Channel}) ->
|
State = #state{pending_topic_ids = Pendings, channel = Channel}) ->
|
||||||
|
@ -1091,10 +1135,17 @@ register_and_notify_client(?PUBLISH_PACKET(QoS, TopicName, PacketId, Payload) =
|
||||||
#mqtt_packet{header = #mqtt_packet_header{dup = Dup, retain = Retain}} = PubPkt,
|
#mqtt_packet{header = #mqtt_packet_header{dup = Dup, retain = Retain}} = PubPkt,
|
||||||
ClientId = emqx_channel:info(clientid, Channel),
|
ClientId = emqx_channel:info(clientid, Channel),
|
||||||
TopicId = emqx_sn_registry:register_topic(ClientId, TopicName),
|
TopicId = emqx_sn_registry:register_topic(ClientId, TopicName),
|
||||||
?LOG(debug, "Register TopicId=~p, TopicName=~p, Payload=~p, Dup=~p, QoS=~p, "
|
?LOG(debug, "Register TopicId=~p, TopicName=~p, Payload=~p, Dup=~p, "
|
||||||
"Retain=~p, MsgId=~p", [TopicId, TopicName, Payload, Dup, QoS, Retain, MsgId]),
|
"QoS=~p,Retain=~p, MsgId=~p",
|
||||||
NewPendings = cache_no_reg_publish_message(Pendings, TopicId, PubPkt, State),
|
[TopicId, TopicName, Payload, Dup, QoS, Retain, MsgId]),
|
||||||
send_register(TopicName, TopicId, MsgId, State#state{pending_topic_ids = NewPendings}).
|
NPendings = case QoS == ?QOS_0 of
|
||||||
|
true ->
|
||||||
|
cache_no_reg_publish_message(
|
||||||
|
Pendings, TopicId, PubPkt, State);
|
||||||
|
_ -> Pendings
|
||||||
|
end,
|
||||||
|
send_register(TopicName, TopicId, MsgId,
|
||||||
|
State#state{pending_topic_ids = NPendings}).
|
||||||
|
|
||||||
message_id(undefined) ->
|
message_id(undefined) ->
|
||||||
rand:uniform(16#FFFF);
|
rand:uniform(16#FFFF);
|
||||||
|
|
|
@ -819,6 +819,151 @@ t_publish_qos2_case03(_) ->
|
||||||
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
||||||
gen_udp:close(Socket).
|
gen_udp:close(Socket).
|
||||||
|
|
||||||
|
t_delivery_qos1_register_invalid_topic_id(_) ->
|
||||||
|
Dup = 0,
|
||||||
|
QoS = 1,
|
||||||
|
Retain = 0,
|
||||||
|
Will = 0,
|
||||||
|
CleanSession = 0,
|
||||||
|
MsgId = 1,
|
||||||
|
TopicId = ?MAX_PRED_TOPIC_ID + 1,
|
||||||
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
||||||
|
send_connect_msg(Socket, <<"test">>),
|
||||||
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
||||||
|
|
||||||
|
send_subscribe_msg_normal_topic(Socket, QoS, <<"ab">>, MsgId),
|
||||||
|
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1,
|
||||||
|
?SN_NORMAL_TOPIC:2, TopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
||||||
|
receive_response(Socket)),
|
||||||
|
|
||||||
|
Payload = <<"test-registration-inconsistent">>,
|
||||||
|
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"ab">>, Payload)),
|
||||||
|
|
||||||
|
?assertEqual(
|
||||||
|
<<(7 + byte_size(Payload)), ?SN_PUBLISH,
|
||||||
|
Dup:1, QoS:2, Retain:1,
|
||||||
|
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
|
||||||
|
TopicId:16, MsgId:16, Payload/binary>>, receive_response(Socket)),
|
||||||
|
%% acked with ?SN_RC_INVALID_TOPIC_ID to
|
||||||
|
send_puback_msg(Socket, TopicId, MsgId, ?SN_RC_INVALID_TOPIC_ID),
|
||||||
|
?assertEqual(
|
||||||
|
{TopicId, MsgId},
|
||||||
|
check_register_msg_on_udp(<<"ab">>, receive_response(Socket))),
|
||||||
|
send_regack_msg(Socket, TopicId, MsgId + 1),
|
||||||
|
|
||||||
|
%% receive the replay message
|
||||||
|
?assertEqual(
|
||||||
|
<<(7 + byte_size(Payload)), ?SN_PUBLISH,
|
||||||
|
Dup:1, QoS:2, Retain:1,
|
||||||
|
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
|
||||||
|
TopicId:16, (MsgId):16, Payload/binary>>, receive_response(Socket)),
|
||||||
|
|
||||||
|
send_disconnect_msg(Socket, undefined),
|
||||||
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
||||||
|
gen_udp:close(Socket).
|
||||||
|
|
||||||
|
t_delivery_takeover_and_re_register(_) ->
|
||||||
|
MsgId = 1,
|
||||||
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
||||||
|
send_connect_msg(Socket, <<"test">>, 0),
|
||||||
|
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
|
||||||
|
receive_response(Socket)),
|
||||||
|
|
||||||
|
send_subscribe_msg_normal_topic(Socket, ?QOS_1, <<"topic-a">>, MsgId+1),
|
||||||
|
<<_, ?SN_SUBACK, 2#00100000,
|
||||||
|
TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
|
||||||
|
|
||||||
|
send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2),
|
||||||
|
<<_, ?SN_SUBACK, 2#01000000,
|
||||||
|
TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
|
||||||
|
|
||||||
|
_ = emqx:publish(
|
||||||
|
emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"test-a">>)),
|
||||||
|
_ = emqx:publish(
|
||||||
|
emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"test-b">>)),
|
||||||
|
|
||||||
|
<<_, ?SN_PUBLISH, 2#00100000,
|
||||||
|
TopicIdA:16, MsgId1:16, "test-a">> = receive_response(Socket),
|
||||||
|
send_puback_msg(Socket, TopicIdA, MsgId1, ?SN_RC_ACCEPTED),
|
||||||
|
|
||||||
|
<<_, ?SN_PUBLISH, 2#01000000,
|
||||||
|
TopicIdB:16, MsgId2:16, "test-b">> = receive_response(Socket),
|
||||||
|
send_puback_msg(Socket, TopicIdB, MsgId2, ?SN_RC_ACCEPTED),
|
||||||
|
|
||||||
|
send_disconnect_msg(Socket, undefined),
|
||||||
|
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
||||||
|
gen_udp:close(Socket),
|
||||||
|
|
||||||
|
%% offline messages will be queued into the MQTT-SN session
|
||||||
|
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m1">>)),
|
||||||
|
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)),
|
||||||
|
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m3">>)),
|
||||||
|
_ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m1">>)),
|
||||||
|
_ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m2">>)),
|
||||||
|
_ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m3">>)),
|
||||||
|
|
||||||
|
{ok, NSocket} = gen_udp:open(0, [binary]),
|
||||||
|
send_connect_msg(NSocket, <<"test">>, 0),
|
||||||
|
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
|
||||||
|
receive_response(NSocket)),
|
||||||
|
|
||||||
|
%% qos1
|
||||||
|
|
||||||
|
%% received the resume messages
|
||||||
|
<<_, ?SN_PUBLISH, 2#00100000,
|
||||||
|
TopicIdA:16, MsgIdA0:16, "m1">> = receive_response(NSocket),
|
||||||
|
%% only one qos1/qos2 inflight
|
||||||
|
?assertEqual(udp_receive_timeout, receive_response(NSocket)),
|
||||||
|
send_puback_msg(NSocket, TopicIdA, MsgIdA0, ?SN_RC_INVALID_TOPIC_ID),
|
||||||
|
%% recv register
|
||||||
|
<<_, ?SN_REGISTER,
|
||||||
|
TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket),
|
||||||
|
send_regack_msg(NSocket, TopicIdA, RegMsgIdA),
|
||||||
|
%% received the replay messages
|
||||||
|
<<_, ?SN_PUBLISH, 2#00100000,
|
||||||
|
TopicIdA:16, MsgIdA1:16, "m1">> = receive_response(NSocket),
|
||||||
|
send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED),
|
||||||
|
|
||||||
|
<<_, ?SN_PUBLISH, 2#00100000,
|
||||||
|
TopicIdA:16, MsgIdA2:16, "m2">> = receive_response(NSocket),
|
||||||
|
send_puback_msg(NSocket, TopicIdA, MsgIdA2, ?SN_RC_ACCEPTED),
|
||||||
|
|
||||||
|
<<_, ?SN_PUBLISH, 2#00100000,
|
||||||
|
TopicIdA:16, MsgIdA3:16, "m3">> = receive_response(NSocket),
|
||||||
|
send_puback_msg(NSocket, TopicIdA, MsgIdA3, ?SN_RC_ACCEPTED),
|
||||||
|
|
||||||
|
%% qos2
|
||||||
|
<<_, ?SN_PUBLISH, 2#01000000,
|
||||||
|
TopicIdB:16, MsgIdB0:16, "m1">> = receive_response(NSocket),
|
||||||
|
%% only one qos1/qos2 inflight
|
||||||
|
?assertEqual(udp_receive_timeout, receive_response(NSocket)),
|
||||||
|
send_puback_msg(NSocket, TopicIdB, MsgIdB0, ?SN_RC_INVALID_TOPIC_ID),
|
||||||
|
%% recv register
|
||||||
|
<<_, ?SN_REGISTER,
|
||||||
|
TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket),
|
||||||
|
send_regack_msg(NSocket, TopicIdB, RegMsgIdB),
|
||||||
|
%% received the replay messages
|
||||||
|
<<_, ?SN_PUBLISH, 2#01000000,
|
||||||
|
TopicIdB:16, MsgIdB1:16, "m1">> = receive_response(NSocket),
|
||||||
|
send_pubrec_msg(NSocket, MsgIdB1),
|
||||||
|
<<_, ?SN_PUBREL, MsgIdB1:16>> = receive_response(NSocket),
|
||||||
|
send_pubcomp_msg(NSocket, MsgIdB1),
|
||||||
|
|
||||||
|
<<_, ?SN_PUBLISH, 2#01000000,
|
||||||
|
TopicIdB:16, MsgIdB2:16, "m2">> = receive_response(NSocket),
|
||||||
|
send_puback_msg(NSocket, TopicIdB, MsgIdB2, ?SN_RC_ACCEPTED),
|
||||||
|
|
||||||
|
<<_, ?SN_PUBLISH, 2#01000000,
|
||||||
|
TopicIdB:16, MsgIdB3:16, "m3">> = receive_response(NSocket),
|
||||||
|
send_puback_msg(NSocket, TopicIdB, MsgIdB3, ?SN_RC_ACCEPTED),
|
||||||
|
|
||||||
|
%% no more messages
|
||||||
|
?assertEqual(udp_receive_timeout, receive_response(NSocket)),
|
||||||
|
|
||||||
|
send_disconnect_msg(NSocket, undefined),
|
||||||
|
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket)),
|
||||||
|
gen_udp:close(NSocket).
|
||||||
|
|
||||||
t_will_case01(_) ->
|
t_will_case01(_) ->
|
||||||
QoS = 1,
|
QoS = 1,
|
||||||
Duration = 1,
|
Duration = 1,
|
||||||
|
@ -1591,13 +1736,16 @@ send_searchgw_msg(Socket) ->
|
||||||
ok = gen_udp:send(Socket, ?HOST, ?PORT, <<Length:8, MsgType:8, Radius:8>>).
|
ok = gen_udp:send(Socket, ?HOST, ?PORT, <<Length:8, MsgType:8, Radius:8>>).
|
||||||
|
|
||||||
send_connect_msg(Socket, ClientId) ->
|
send_connect_msg(Socket, ClientId) ->
|
||||||
|
send_connect_msg(Socket, ClientId, 1).
|
||||||
|
|
||||||
|
send_connect_msg(Socket, ClientId, CleanSession) when CleanSession == 0;
|
||||||
|
CleanSession == 1 ->
|
||||||
Length = 6 + byte_size(ClientId),
|
Length = 6 + byte_size(ClientId),
|
||||||
MsgType = ?SN_CONNECT,
|
MsgType = ?SN_CONNECT,
|
||||||
Dup = 0,
|
Dup = 0,
|
||||||
QoS = 0,
|
QoS = 0,
|
||||||
Retain = 0,
|
Retain = 0,
|
||||||
Will = 0,
|
Will = 0,
|
||||||
CleanSession = 1,
|
|
||||||
TopicIdType = 0,
|
TopicIdType = 0,
|
||||||
ProtocolId = 1,
|
ProtocolId = 1,
|
||||||
Duration = 10,
|
Duration = 10,
|
||||||
|
@ -1713,9 +1861,12 @@ send_publish_msg_short_topic(Socket, QoS, MsgId, TopicName, Data) ->
|
||||||
ok = gen_udp:send(Socket, ?HOST, ?PORT, PublishPacket).
|
ok = gen_udp:send(Socket, ?HOST, ?PORT, PublishPacket).
|
||||||
|
|
||||||
send_puback_msg(Socket, TopicId, MsgId) ->
|
send_puback_msg(Socket, TopicId, MsgId) ->
|
||||||
|
send_puback_msg(Socket, TopicId, MsgId, ?SN_RC_ACCEPTED).
|
||||||
|
|
||||||
|
send_puback_msg(Socket, TopicId, MsgId, Rc) ->
|
||||||
Length = 7,
|
Length = 7,
|
||||||
MsgType = ?SN_PUBACK,
|
MsgType = ?SN_PUBACK,
|
||||||
PubAckPacket = <<Length:8, MsgType:8, TopicId:16, MsgId:16, ?SN_RC_ACCEPTED:8>>,
|
PubAckPacket = <<Length:8, MsgType:8, TopicId:16, MsgId:16, Rc:8>>,
|
||||||
?LOG("send_puback_msg TopicId=~p, MsgId=~p", [TopicId, MsgId]),
|
?LOG("send_puback_msg TopicId=~p, MsgId=~p", [TopicId, MsgId]),
|
||||||
ok = gen_udp:send(Socket, ?HOST, ?PORT, PubAckPacket).
|
ok = gen_udp:send(Socket, ?HOST, ?PORT, PubAckPacket).
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue