From 98edbc39af23d4cf777da2d9e62cacd9ff0f4537 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Fri, 16 Apr 2021 11:43:13 +0800 Subject: [PATCH] fix(emqx_sn): race_condition when discarding --- apps/emqx_sn/src/emqx_sn_gateway.erl | 51 ++++++++++++--------------- apps/emqx_sn/src/emqx_sn_registry.erl | 40 ++++++++++----------- 2 files changed, 43 insertions(+), 48 deletions(-) diff --git a/apps/emqx_sn/src/emqx_sn_gateway.erl b/apps/emqx_sn/src/emqx_sn_gateway.erl index bd343e038..2238393c5 100644 --- a/apps/emqx_sn/src/emqx_sn_gateway.erl +++ b/apps/emqx_sn/src/emqx_sn_gateway.erl @@ -202,7 +202,7 @@ idle(cast, {incoming, ?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1, }, TopicId, _MsgId, Data)}, State = #state{clientid = ClientId, registry = Registry}) -> TopicName = case (TopicIdType =:= ?SN_SHORT_TOPIC) of - false -> emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId); + false -> emqx_sn_registry:lookup_topic(Registry, self(), TopicId); true -> <> end, _ = case TopicName =/= undefined of @@ -294,7 +294,7 @@ wait_for_will_msg(EventType, EventContent, State) -> connected(cast, {incoming, ?SN_REGISTER_MSG(_TopicId, MsgId, TopicName)}, State = #state{clientid = ClientId, registry = Registry}) -> - case emqx_sn_registry:register_topic(Registry, ClientId, TopicName) of + case emqx_sn_registry:register_topic(Registry, self(), TopicName) of TopicId when is_integer(TopicId) -> ?LOG(debug, "register ClientId=~p, TopicName=~p, TopicId=~p", [ClientId, TopicName, TopicId]), send_message(?SN_REGACK_MSG(TopicId, MsgId, ?SN_RC_ACCEPTED), State); @@ -586,10 +586,9 @@ handle_event(EventType, EventContent, StateName, State) -> [StateName, {EventType, EventContent}]), {keep_state, State}. -terminate(Reason, _StateName, #state{clientid = ClientId, - channel = Channel, +terminate(Reason, _StateName, #state{channel = Channel, registry = Registry}) -> - emqx_sn_registry:unregister_topic(Registry, ClientId), + emqx_sn_registry:unregister_topic(Registry, self()), case Channel =:= undefined of true -> ok; false -> emqx_channel:terminate(Reason, Channel) @@ -724,12 +723,10 @@ mqtt2sn(?UNSUBACK_PACKET(MsgId), _State)-> ?SN_UNSUBACK_MSG(MsgId); mqtt2sn(?PUBLISH_PACKET(QoS, Topic, PacketId, Payload), #state{registry = Registry}) -> - NewPacketId = if - QoS =:= ?QOS_0 -> 0; - true -> PacketId + NewPacketId = if QoS =:= ?QOS_0 -> 0; + true -> PacketId end, - ClientId = get(clientid), - {TopicIdType, TopicContent} = case emqx_sn_registry:lookup_topic_id(Registry, ClientId, Topic) of + {TopicIdType, TopicContent} = case emqx_sn_registry:lookup_topic_id(Registry, self(), Topic) of {predef, PredefTopicId} -> {?SN_PREDEFINED_TOPIC, PredefTopicId}; TopicId when is_integer(TopicId) -> @@ -830,7 +827,6 @@ do_connect(ClientId, CleanStart, WillFlag, Duration, State) -> keepalive = Duration, properties = OnlyOneInflight }, - put(clientid, ClientId), case WillFlag of true -> send_message(?SN_WILLTOPICREQ_MSG(), State), NState = State#state{connpkt = ConnPkt, @@ -847,7 +843,6 @@ do_connect(ClientId, CleanStart, WillFlag, Duration, State) -> do_2nd_connect(Flags, Duration, ClientId, State = #state{sockname = Sockname, peername = Peername, - clientid = OldClientId, registry = Registry, channel = Channel}) -> emqx_logger:set_metadata_clientid(ClientId), @@ -855,7 +850,7 @@ do_2nd_connect(Flags, Duration, ClientId, State = #state{sockname = Sockname, NChannel = case CleanStart of true -> emqx_channel:terminate(normal, Channel), - emqx_sn_registry:unregister_topic(Registry, OldClientId), + emqx_sn_registry:unregister_topic(Registry, self()), emqx_channel:init(#{socktype => udp, sockname => Sockname, peername => Peername, @@ -868,8 +863,8 @@ do_2nd_connect(Flags, Duration, ClientId, State = #state{sockname = Sockname, do_connect(ClientId, CleanStart, Will, Duration, NState). handle_subscribe(?SN_NORMAL_TOPIC, TopicName, QoS, MsgId, - State=#state{clientid = ClientId, registry = Registry}) -> - case emqx_sn_registry:register_topic(Registry, ClientId, TopicName) of + State=#state{registry = Registry}) -> + case emqx_sn_registry:register_topic(Registry, self(), TopicName) of {error, too_large} -> ok = send_message(?SN_SUBACK_MSG(#mqtt_sn_flags{qos = QoS}, ?SN_INVALID_TOPIC_ID, @@ -883,8 +878,8 @@ handle_subscribe(?SN_NORMAL_TOPIC, TopicName, QoS, MsgId, end; handle_subscribe(?SN_PREDEFINED_TOPIC, TopicId, QoS, MsgId, - State = #state{clientid = ClientId, registry = Registry}) -> - case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of + State = #state{registry = Registry}) -> + case emqx_sn_registry:lookup_topic(Registry, self(), TopicId) of undefined -> ok = send_message(?SN_SUBACK_MSG(#mqtt_sn_flags{qos = QoS}, TopicId, @@ -913,8 +908,8 @@ handle_unsubscribe(?SN_NORMAL_TOPIC, TopicId, MsgId, State) -> proto_unsubscribe(TopicId, MsgId, State); handle_unsubscribe(?SN_PREDEFINED_TOPIC, TopicId, MsgId, - State = #state{clientid = ClientId, registry = Registry}) -> - case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of + State = #state{registry = Registry}) -> + case emqx_sn_registry:lookup_topic(Registry, self(), TopicId) of undefined -> ok = send_message(?SN_UNSUBACK_MSG(MsgId), State), {keep_state, State}; @@ -938,10 +933,10 @@ do_publish(?SN_NORMAL_TOPIC, TopicName, Data, Flags, MsgId, State) -> <> = TopicName, do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId, State); do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId, - State=#state{clientid = ClientId, registry = Registry}) -> + State=#state{registry = Registry}) -> #mqtt_sn_flags{qos = QoS, dup = Dup, retain = Retain} = Flags, NewQoS = get_corrected_qos(QoS), - case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of + case emqx_sn_registry:lookup_topic(Registry, self(), TopicId) of undefined -> (NewQoS =/= ?QOS_0) andalso send_message(?SN_PUBACK_MSG(TopicId, MsgId, ?SN_RC_INVALID_TOPIC_ID), State), {keep_state, State}; @@ -979,12 +974,12 @@ do_publish_will(#state{will_msg = WillMsg, clientid = ClientId}) -> ok. do_puback(TopicId, MsgId, ReturnCode, StateName, - State=#state{clientid = ClientId, registry = Registry}) -> + State=#state{registry = Registry}) -> case ReturnCode of ?SN_RC_ACCEPTED -> handle_incoming(?PUBACK_PACKET(MsgId), StateName, State); ?SN_RC_INVALID_TOPIC_ID -> - case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of + case emqx_sn_registry:lookup_topic(Registry, self(), TopicId) of undefined -> ok; TopicName -> %%notice that this TopicName maybe normal or predefined, @@ -1072,24 +1067,24 @@ handle_outgoing(Packets, State) when is_list(Packets) -> lists:foreach(fun(Packet) -> handle_outgoing(Packet, State) end, Packets); handle_outgoing(PubPkt = ?PUBLISH_PACKET(QoS, TopicName, PacketId, Payload), - State = #state{clientid = ClientId, registry = Registry}) -> + State = #state{registry = Registry}) -> #mqtt_packet{header = #mqtt_packet_header{dup = Dup, retain = Retain}} = PubPkt, MsgId = message_id(PacketId), ?LOG(debug, "Handle outgoing: ~0p", [PubPkt]), - (emqx_sn_registry:lookup_topic_id(Registry, ClientId, TopicName) == undefined) + (emqx_sn_registry:lookup_topic_id(Registry, self(), TopicName) == undefined) andalso (byte_size(TopicName) =/= 2) andalso register_and_notify_client(TopicName, Payload, Dup, QoS, - Retain, MsgId, ClientId, State), + Retain, MsgId, State), send_message(mqtt2sn(PubPkt, State), State); handle_outgoing(Packet, State) -> send_message(mqtt2sn(Packet, State), State). -register_and_notify_client(TopicName, Payload, Dup, QoS, Retain, MsgId, ClientId, +register_and_notify_client(TopicName, Payload, Dup, QoS, Retain, MsgId, State = #state{registry = Registry}) -> - TopicId = emqx_sn_registry:register_topic(Registry, ClientId, TopicName), + TopicId = emqx_sn_registry:register_topic(Registry, self(), TopicName), ?LOG(debug, "Register TopicId=~p, TopicName=~p, Payload=~p, Dup=~p, QoS=~p, " "Retain=~p, MsgId=~p", [TopicId, TopicName, Payload, Dup, QoS, Retain, MsgId]), send_register(TopicName, TopicId, MsgId, State). diff --git a/apps/emqx_sn/src/emqx_sn_registry.erl b/apps/emqx_sn/src/emqx_sn_registry.erl index 78e27ae01..d65005072 100644 --- a/apps/emqx_sn/src/emqx_sn_registry.erl +++ b/apps/emqx_sn/src/emqx_sn_registry.erl @@ -61,10 +61,10 @@ stop({_Tab, Pid}) -> gen_server:stop(Pid, normal, infinity). -spec(register_topic(registry(), binary(), binary()) -> integer() | {error, term()}). -register_topic({_, Pid}, ClientId, TopicName) when is_binary(TopicName) -> +register_topic({_, Pid}, ClientPid, TopicName) when is_binary(TopicName) -> case emqx_topic:wildcard(TopicName) of false -> - gen_server:call(Pid, {register, ClientId, TopicName}); + gen_server:call(Pid, {register, ClientPid, TopicName}); %% TopicId: in case of “accepted” the value that will be used as topic %% id by the gateway when sending PUBLISH messages to the client (not %% relevant in case of subscriptions to a short topic name or to a topic @@ -73,10 +73,10 @@ register_topic({_, Pid}, ClientId, TopicName) when is_binary(TopicName) -> end. -spec(lookup_topic(registry(), binary(), pos_integer()) -> undefined | binary()). -lookup_topic({Tab, _Pid}, ClientId, TopicId) when is_integer(TopicId) -> +lookup_topic({Tab, _Pid}, ClientPid, TopicId) when is_integer(TopicId) -> case lookup_element(Tab, {predef, TopicId}, 2) of undefined -> - lookup_element(Tab, {ClientId, TopicId}, 2); + lookup_element(Tab, {ClientPid, TopicId}, 2); Topic -> Topic end. @@ -84,10 +84,10 @@ lookup_topic({Tab, _Pid}, ClientId, TopicId) when is_integer(TopicId) -> -> undefined | pos_integer() | {predef, integer()}). -lookup_topic_id({Tab, _Pid}, ClientId, TopicName) when is_binary(TopicName) -> +lookup_topic_id({Tab, _Pid}, ClientPid, TopicName) when is_binary(TopicName) -> case lookup_element(Tab, {predef, TopicName}, 2) of undefined -> - lookup_element(Tab, {ClientId, TopicName}, 2); + lookup_element(Tab, {ClientPid, TopicName}, 2); TopicId -> {predef, TopicId} end. @@ -97,16 +97,16 @@ lookup_element(Tab, Key, Pos) -> try ets:lookup_element(Tab, Key, Pos) catch error:badarg -> undefined end. -spec(unregister_topic(registry(), binary()) -> ok). -unregister_topic({_Tab, Pid}, ClientId) -> - gen_server:call(Pid, {unregister, ClientId}). +unregister_topic({_Tab, Pid}, ClientPid) -> + gen_server:call(Pid, {unregister, ClientPid}). %%----------------------------------------------------------------------------- init([Tab, PredefTopics]) -> %% {predef, TopicId} -> TopicName %% {predef, TopicName} -> TopicId - %% {ClientId, TopicId} -> TopicName - %% {ClientId, TopicName} -> TopicId + %% {ClientPid, TopicId} -> TopicName + %% {ClientPid, TopicName} -> TopicId _ = ets:new(Tab, [set, public, named_table, {read_concurrency, true}]), MaxPredefId = lists:foldl( fun({TopicId, TopicName}, AccId) -> @@ -116,27 +116,27 @@ init([Tab, PredefTopics]) -> end, 0, PredefTopics), {ok, #state{tab = Tab, max_predef_topic_id = MaxPredefId}}. -handle_call({register, ClientId, TopicName}, _From, +handle_call({register, ClientPid, TopicName}, _From, State = #state{tab = Tab, max_predef_topic_id = PredefId}) -> - case lookup_topic_id({Tab, self()}, ClientId, TopicName) of + case lookup_topic_id({Tab, self()}, ClientPid, TopicName) of {predef, PredefTopicId} when is_integer(PredefTopicId) -> {reply, PredefTopicId, State}; TopicId when is_integer(TopicId) -> {reply, TopicId, State}; undefined -> - case next_topic_id(Tab, PredefId, ClientId) of + case next_topic_id(Tab, PredefId, ClientPid) of TopicId when TopicId >= 16#FFFF -> {reply, {error, too_large}, State}; TopicId -> - _ = ets:insert(Tab, {{ClientId, next_topic_id}, TopicId + 1}), - _ = ets:insert(Tab, {{ClientId, TopicName}, TopicId}), - _ = ets:insert(Tab, {{ClientId, TopicId}, TopicName}), + _ = ets:insert(Tab, {{ClientPid, next_topic_id}, TopicId + 1}), + _ = ets:insert(Tab, {{ClientPid, TopicName}, TopicId}), + _ = ets:insert(Tab, {{ClientPid, TopicId}, TopicName}), {reply, TopicId, State} end end; -handle_call({unregister, ClientId}, _From, State = #state{tab = Tab}) -> - ets:match_delete(Tab, {{ClientId, '_'}, '_'}), +handle_call({unregister, ClientPid}, _From, State = #state{tab = Tab}) -> + ets:match_delete(Tab, {{ClientPid, '_'}, '_'}), {reply, ok, State}; handle_call(Req, _From, State) -> @@ -159,8 +159,8 @@ code_change(_OldVsn, State, _Extra) -> %%----------------------------------------------------------------------------- -next_topic_id(Tab, PredefId, ClientId) -> - case ets:lookup(Tab, {ClientId, next_topic_id}) of +next_topic_id(Tab, PredefId, ClientPid) -> + case ets:lookup(Tab, {ClientPid, next_topic_id}) of [{_, Id}] -> Id; [] -> PredefId + 1 end.