fix(mqtt-sn): fix clean session false reconnect topic-id badmatch

This commit is contained in:
Turtle 2021-06-01 15:55:05 +08:00 committed by turtleDeng
parent 5737daeb43
commit 5068678eea
4 changed files with 69 additions and 60 deletions

View File

@ -1,6 +1,6 @@
{application, emqx_sn,
[{description, "EMQ X MQTT-SN Plugin"},
{vsn, "4.3.1"}, % strict semver, bump manually!
{vsn, "4.3.2"}, % strict semver, bump manually!
{modules, []},
{registered, []},
{applications, [kernel,stdlib,esockd]},

View File

@ -1,17 +1,13 @@
%% -*-: erlang -*-
{VSN,
[
{"4.3.0", [
{load_module, emqx_sn_asleep_timer, brutal_purge, soft_purge, []},
{load_module, emqx_sn_gateway, brutal_purge, soft_purge, [emqx_sn_asleep_timer]}
]},
{<<".*">>, []}
{<<"4.3.[0-1]">>, [
{restart_application, emqx_sn}
]}
],
[
{"4.3.0", [
{load_module, emqx_sn_asleep_timer, brutal_purge, soft_purge, []},
{load_module, emqx_sn_gateway, brutal_purge, soft_purge, [emqx_sn_asleep_timer]}
]},
{<<".*">>, []}
{<<"4.3.[0-1]">>, [
{restart_application, emqx_sn}
]}
]
}.

View File

@ -207,7 +207,7 @@ idle(cast, {incoming, ?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1,
}, TopicId, _MsgId, Data)},
State = #state{clientid = ClientId, registry = Registry}) ->
TopicName = case (TopicIdType =:= ?SN_SHORT_TOPIC) of
false -> emqx_sn_registry:lookup_topic(Registry, self(), TopicId);
false -> emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId);
true -> <<TopicId:16>>
end,
_ = case TopicName =/= undefined of
@ -294,7 +294,7 @@ wait_for_will_msg(EventType, EventContent, State) ->
connected(cast, {incoming, ?SN_REGISTER_MSG(_TopicId, MsgId, TopicName)},
State = #state{clientid = ClientId, registry = Registry}) ->
State0 =
case emqx_sn_registry:register_topic(Registry, self(), TopicName) of
case emqx_sn_registry:register_topic(Registry, ClientId, TopicName) of
TopicId when is_integer(TopicId) ->
?LOG(debug, "register ClientId=~p, TopicName=~p, TopicId=~p", [ClientId, TopicName, TopicId]),
send_message(?SN_REGACK_MSG(TopicId, MsgId, ?SN_RC_ACCEPTED), State);
@ -582,11 +582,15 @@ handle_event(EventType, EventContent, StateName, State) ->
terminate(Reason, _StateName, #state{channel = Channel,
registry = Registry}) ->
emqx_sn_registry:unregister_topic(Registry, self()),
case Channel =:= undefined of
true -> ok;
false -> emqx_channel:terminate(Reason, Channel)
end.
ClientId = emqx_channel:info(clientid, Channel),
case Reason of
{shutdown, takeovered} ->
ok;
_ ->
emqx_sn_registry:unregister_topic(Registry, ClientId)
end,
emqx_channel:terminate(Reason, Channel),
ok.
code_change(_Vsn, StateName, State, _Extra) ->
{ok, StateName, State}.
@ -719,11 +723,13 @@ mqtt2sn(?PUBCOMP_PACKET(MsgId), _State) ->
mqtt2sn(?UNSUBACK_PACKET(MsgId), _State)->
?SN_UNSUBACK_MSG(MsgId);
mqtt2sn(?PUBLISH_PACKET(QoS, Topic, PacketId, Payload), #state{registry = Registry}) ->
mqtt2sn(?PUBLISH_PACKET(QoS, Topic, PacketId, Payload), #state{registry = Registry,
channel = Channel}) ->
NewPacketId = if QoS =:= ?QOS_0 -> 0;
true -> PacketId
end,
{TopicIdType, TopicContent} = case emqx_sn_registry:lookup_topic_id(Registry, self(), Topic) of
ClientId = emqx_channel:info(clientid, Channel),
{TopicIdType, TopicContent} = case emqx_sn_registry:lookup_topic_id(Registry, ClientId, Topic) of
{predef, PredefTopicId} ->
{?SN_PREDEFINED_TOPIC, PredefTopicId};
TopicId when is_integer(TopicId) ->
@ -851,7 +857,7 @@ do_2nd_connect(Flags, Duration, ClientId, State = #state{sockname = Sockname,
NChannel = case CleanStart of
true ->
emqx_channel:terminate(normal, Channel),
emqx_sn_registry:unregister_topic(Registry, self()),
emqx_sn_registry:unregister_topic(Registry, ClientId),
emqx_channel:init(#{socktype => udp,
sockname => Sockname,
peername => Peername,
@ -864,8 +870,9 @@ do_2nd_connect(Flags, Duration, ClientId, State = #state{sockname = Sockname,
do_connect(ClientId, CleanStart, Will, Duration, NState).
handle_subscribe(?SN_NORMAL_TOPIC, TopicName, QoS, MsgId,
State=#state{registry = Registry}) ->
case emqx_sn_registry:register_topic(Registry, self(), TopicName) of
State=#state{registry = Registry, channel = Channel}) ->
ClientId = emqx_channel:info(clientid, Channel),
case emqx_sn_registry:register_topic(Registry, ClientId, TopicName) of
{error, too_large} ->
State0 = send_message(?SN_SUBACK_MSG(#mqtt_sn_flags{qos = QoS},
?SN_INVALID_TOPIC_ID,
@ -879,8 +886,9 @@ handle_subscribe(?SN_NORMAL_TOPIC, TopicName, QoS, MsgId,
end;
handle_subscribe(?SN_PREDEFINED_TOPIC, TopicId, QoS, MsgId,
State = #state{registry = Registry}) ->
case emqx_sn_registry:lookup_topic(Registry, self(), TopicId) of
State = #state{registry = Registry, channel = Channel}) ->
ClientId = emqx_channel:info(clientid, Channel),
case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of
undefined ->
State0 = send_message(?SN_SUBACK_MSG(#mqtt_sn_flags{qos = QoS},
TopicId,
@ -909,8 +917,9 @@ handle_unsubscribe(?SN_NORMAL_TOPIC, TopicId, MsgId, State) ->
proto_unsubscribe(TopicId, MsgId, State);
handle_unsubscribe(?SN_PREDEFINED_TOPIC, TopicId, MsgId,
State = #state{registry = Registry}) ->
case emqx_sn_registry:lookup_topic(Registry, self(), TopicId) of
State = #state{registry = Registry, channel = Channel}) ->
ClientId = emqx_channel:info(clientid, Channel),
case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of
undefined ->
{keep_state, send_message(?SN_UNSUBACK_MSG(MsgId), State)};
PredefinedTopic ->
@ -932,10 +941,11 @@ do_publish(?SN_NORMAL_TOPIC, TopicName, Data, Flags, MsgId, State) ->
<<TopicId:16>> = TopicName,
do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId, State);
do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId,
State=#state{registry = Registry}) ->
State=#state{registry = Registry, channel = Channel}) ->
#mqtt_sn_flags{qos = QoS, dup = Dup, retain = Retain} = Flags,
NewQoS = get_corrected_qos(QoS),
case emqx_sn_registry:lookup_topic(Registry, self(), TopicId) of
ClientId = emqx_channel:info(clientid, Channel),
case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of
undefined ->
{keep_state, maybe_send_puback(NewQoS, TopicId, MsgId, ?SN_RC_INVALID_TOPIC_ID,
State)};
@ -946,7 +956,7 @@ do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId,
do_publish(?SN_SHORT_TOPIC, STopicName, Data, Flags, MsgId, State) ->
#mqtt_sn_flags{qos = QoS, dup = Dup, retain = Retain} = Flags,
NewQoS = get_corrected_qos(QoS),
<<TopicId:16>> = STopicName ,
<<TopicId:16>> = STopicName,
case emqx_topic:wildcard(STopicName) of
true ->
{keep_state, maybe_send_puback(NewQoS, TopicId, MsgId, ?SN_RC_NOT_SUPPORTED,
@ -974,12 +984,13 @@ do_publish_will(#state{will_msg = WillMsg, clientid = ClientId}) ->
ok.
do_puback(TopicId, MsgId, ReturnCode, StateName,
State=#state{registry = Registry}) ->
State=#state{registry = Registry, channel = Channel}) ->
case ReturnCode of
?SN_RC_ACCEPTED ->
handle_incoming(?PUBACK_PACKET(MsgId), StateName, State);
?SN_RC_INVALID_TOPIC_ID ->
case emqx_sn_registry:lookup_topic(Registry, self(), TopicId) of
ClientId = emqx_channel:info(clientid, Channel),
case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of
undefined -> {keep_state, State};
TopicName ->
%%notice that this TopicName maybe normal or predefined,
@ -1068,9 +1079,10 @@ handle_outgoing(Packets, State) when is_list(Packets) ->
end, State, Packets);
handle_outgoing(PubPkt = ?PUBLISH_PACKET(_, TopicName, _, _),
State = #state{registry = Registry}) ->
State = #state{registry = Registry, channel = Channel}) ->
?LOG(debug, "Handle outgoing publish: ~0p", [PubPkt]),
TopicId = emqx_sn_registry:lookup_topic_id(Registry, self(), TopicName),
ClientId = emqx_channel:info(clientid, Channel),
TopicId = emqx_sn_registry:lookup_topic_id(Registry, ClientId, TopicName),
case (TopicId == undefined) andalso (byte_size(TopicName) =/= 2) of
true -> register_and_notify_client(PubPkt, State);
false -> send_message(mqtt2sn(PubPkt, State), State)
@ -1094,10 +1106,11 @@ replay_no_reg_pending_publishes(TopicId, #state{pending_topic_ids = Pendings} =
State#state{pending_topic_ids = maps:remove(TopicId, Pendings)}.
register_and_notify_client(?PUBLISH_PACKET(QoS, TopicName, PacketId, Payload) = PubPkt,
State = #state{registry = Registry, pending_topic_ids = Pendings}) ->
State = #state{registry = Registry, pending_topic_ids = Pendings, channel = Channel}) ->
MsgId = message_id(PacketId),
#mqtt_packet{header = #mqtt_packet_header{dup = Dup, retain = Retain}} = PubPkt,
TopicId = emqx_sn_registry:register_topic(Registry, self(), TopicName),
ClientId = emqx_channel:info(clientid, Channel),
TopicId = emqx_sn_registry:register_topic(Registry, ClientId, TopicName),
?LOG(debug, "Register TopicId=~p, TopicName=~p, Payload=~p, Dup=~p, QoS=~p, "
"Retain=~p, MsgId=~p", [TopicId, TopicName, Payload, Dup, QoS, Retain, MsgId]),
NewPendings = cache_no_reg_publish_message(Pendings, TopicId, PubPkt, State),

View File

@ -60,11 +60,11 @@ start_link(Tab, PredefTopics) ->
stop({_Tab, Pid}) ->
gen_server:stop(Pid, normal, infinity).
-spec(register_topic(registry(), pid(), binary()) -> integer() | {error, term()}).
register_topic({_, Pid}, ClientPid, TopicName) when is_binary(TopicName) ->
-spec(register_topic(registry(), binary(), binary()) -> integer() | {error, term()}).
register_topic({_, Pid}, ClientId, TopicName) when is_binary(TopicName) ->
case emqx_topic:wildcard(TopicName) of
false ->
gen_server:call(Pid, {register, ClientPid, TopicName});
gen_server:call(Pid, {register, ClientId, TopicName});
%% TopicId: in case of accepted the value that will be used as topic
%% id by the gateway when sending PUBLISH messages to the client (not
%% relevant in case of subscriptions to a short topic name or to a topic
@ -72,22 +72,22 @@ register_topic({_, Pid}, ClientPid, TopicName) when is_binary(TopicName) ->
true -> {error, wildcard_topic}
end.
-spec(lookup_topic(registry(), pid(), pos_integer()) -> undefined | binary()).
lookup_topic({Tab, _Pid}, ClientPid, TopicId) when is_integer(TopicId) ->
-spec(lookup_topic(registry(), binary(), pos_integer()) -> undefined | binary()).
lookup_topic({Tab, _Pid}, ClientId, TopicId) when is_integer(TopicId) ->
case lookup_element(Tab, {predef, TopicId}, 2) of
undefined ->
lookup_element(Tab, {ClientPid, TopicId}, 2);
lookup_element(Tab, {ClientId, TopicId}, 2);
Topic -> Topic
end.
-spec(lookup_topic_id(registry(), pid(), binary())
-spec(lookup_topic_id(registry(), binary(), binary())
-> undefined
| pos_integer()
| {predef, integer()}).
lookup_topic_id({Tab, _Pid}, ClientPid, TopicName) when is_binary(TopicName) ->
lookup_topic_id({Tab, _Pid}, ClientId, TopicName) when is_binary(TopicName) ->
case lookup_element(Tab, {predef, TopicName}, 2) of
undefined ->
lookup_element(Tab, {ClientPid, TopicName}, 2);
lookup_element(Tab, {ClientId, TopicName}, 2);
TopicId ->
{predef, TopicId}
end.
@ -96,17 +96,17 @@ lookup_topic_id({Tab, _Pid}, ClientPid, TopicName) when is_binary(TopicName) ->
lookup_element(Tab, Key, Pos) ->
try ets:lookup_element(Tab, Key, Pos) catch error:badarg -> undefined end.
-spec(unregister_topic(registry(), pid()) -> ok).
unregister_topic({_Tab, Pid}, ClientPid) ->
gen_server:call(Pid, {unregister, ClientPid}).
-spec(unregister_topic(registry(), binary()) -> ok).
unregister_topic({_Tab, Pid}, ClientId) ->
gen_server:call(Pid, {unregister, ClientId}).
%%-----------------------------------------------------------------------------
init([Tab, PredefTopics]) ->
%% {predef, TopicId} -> TopicName
%% {predef, TopicName} -> TopicId
%% {ClientPid, TopicId} -> TopicName
%% {ClientPid, TopicName} -> TopicId
%% {ClientId, TopicId} -> TopicName
%% {ClientId, TopicName} -> TopicId
_ = ets:new(Tab, [set, public, named_table, {read_concurrency, true}]),
MaxPredefId = lists:foldl(
fun({TopicId, TopicName}, AccId) ->
@ -116,27 +116,27 @@ init([Tab, PredefTopics]) ->
end, 0, PredefTopics),
{ok, #state{tab = Tab, max_predef_topic_id = MaxPredefId}}.
handle_call({register, ClientPid, TopicName}, _From,
handle_call({register, ClientId, TopicName}, _From,
State = #state{tab = Tab, max_predef_topic_id = PredefId}) ->
case lookup_topic_id({Tab, self()}, ClientPid, TopicName) of
case lookup_topic_id({Tab, self()}, ClientId, TopicName) of
{predef, PredefTopicId} when is_integer(PredefTopicId) ->
{reply, PredefTopicId, State};
TopicId when is_integer(TopicId) ->
{reply, TopicId, State};
undefined ->
case next_topic_id(Tab, PredefId, ClientPid) of
case next_topic_id(Tab, PredefId, ClientId) of
TopicId when TopicId >= 16#FFFF ->
{reply, {error, too_large}, State};
TopicId ->
_ = ets:insert(Tab, {{ClientPid, next_topic_id}, TopicId + 1}),
_ = ets:insert(Tab, {{ClientPid, TopicName}, TopicId}),
_ = ets:insert(Tab, {{ClientPid, TopicId}, TopicName}),
_ = ets:insert(Tab, {{ClientId, next_topic_id}, TopicId + 1}),
_ = ets:insert(Tab, {{ClientId, TopicName}, TopicId}),
_ = ets:insert(Tab, {{ClientId, TopicId}, TopicName}),
{reply, TopicId, State}
end
end;
handle_call({unregister, ClientPid}, _From, State = #state{tab = Tab}) ->
ets:match_delete(Tab, {{ClientPid, '_'}, '_'}),
handle_call({unregister, ClientId}, _From, State = #state{tab = Tab}) ->
ets:match_delete(Tab, {{ClientId, '_'}, '_'}),
{reply, ok, State};
handle_call(Req, _From, State) ->
@ -159,8 +159,8 @@ code_change(_OldVsn, State, _Extra) ->
%%-----------------------------------------------------------------------------
next_topic_id(Tab, PredefId, ClientPid) ->
case ets:lookup(Tab, {ClientPid, next_topic_id}) of
next_topic_id(Tab, PredefId, ClientId) ->
case ets:lookup(Tab, {ClientId, next_topic_id}) of
[{_, Id}] -> Id;
[] -> PredefId + 1
end.