diff --git a/apps/emqx_sn/src/emqx_sn.app.src b/apps/emqx_sn/src/emqx_sn.app.src index 3d7db1b02..82ec3b9fb 100644 --- a/apps/emqx_sn/src/emqx_sn.app.src +++ b/apps/emqx_sn/src/emqx_sn.app.src @@ -1,6 +1,6 @@ {application, emqx_sn, [{description, "EMQ X MQTT-SN Plugin"}, - {vsn, "4.3.1"}, % strict semver, bump manually! + {vsn, "4.3.2"}, % strict semver, bump manually! {modules, []}, {registered, []}, {applications, [kernel,stdlib,esockd]}, diff --git a/apps/emqx_sn/src/emqx_sn.appup.src b/apps/emqx_sn/src/emqx_sn.appup.src index 499664fe1..a6a1a6c84 100644 --- a/apps/emqx_sn/src/emqx_sn.appup.src +++ b/apps/emqx_sn/src/emqx_sn.appup.src @@ -1,17 +1,13 @@ %% -*-: erlang -*- {VSN, [ - {"4.3.0", [ - {load_module, emqx_sn_asleep_timer, brutal_purge, soft_purge, []}, - {load_module, emqx_sn_gateway, brutal_purge, soft_purge, [emqx_sn_asleep_timer]} - ]}, - {<<".*">>, []} + {<<"4.3.[0-1]">>, [ + {restart_application, emqx_sn} + ]} ], [ - {"4.3.0", [ - {load_module, emqx_sn_asleep_timer, brutal_purge, soft_purge, []}, - {load_module, emqx_sn_gateway, brutal_purge, soft_purge, [emqx_sn_asleep_timer]} - ]}, - {<<".*">>, []} + {<<"4.3.[0-1]">>, [ + {restart_application, emqx_sn} + ]} ] }. diff --git a/apps/emqx_sn/src/emqx_sn_gateway.erl b/apps/emqx_sn/src/emqx_sn_gateway.erl index 96f849974..66022100e 100644 --- a/apps/emqx_sn/src/emqx_sn_gateway.erl +++ b/apps/emqx_sn/src/emqx_sn_gateway.erl @@ -207,7 +207,7 @@ idle(cast, {incoming, ?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1, }, TopicId, _MsgId, Data)}, State = #state{clientid = ClientId, registry = Registry}) -> TopicName = case (TopicIdType =:= ?SN_SHORT_TOPIC) of - false -> emqx_sn_registry:lookup_topic(Registry, self(), TopicId); + false -> emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId); true -> <> end, _ = case TopicName =/= undefined of @@ -294,7 +294,7 @@ wait_for_will_msg(EventType, EventContent, State) -> connected(cast, {incoming, ?SN_REGISTER_MSG(_TopicId, MsgId, TopicName)}, State = #state{clientid = ClientId, registry = Registry}) -> State0 = - case emqx_sn_registry:register_topic(Registry, self(), TopicName) of + case emqx_sn_registry:register_topic(Registry, ClientId, TopicName) of TopicId when is_integer(TopicId) -> ?LOG(debug, "register ClientId=~p, TopicName=~p, TopicId=~p", [ClientId, TopicName, TopicId]), send_message(?SN_REGACK_MSG(TopicId, MsgId, ?SN_RC_ACCEPTED), State); @@ -582,11 +582,15 @@ handle_event(EventType, EventContent, StateName, State) -> terminate(Reason, _StateName, #state{channel = Channel, registry = Registry}) -> - emqx_sn_registry:unregister_topic(Registry, self()), - case Channel =:= undefined of - true -> ok; - false -> emqx_channel:terminate(Reason, Channel) - end. + ClientId = emqx_channel:info(clientid, Channel), + case Reason of + {shutdown, takeovered} -> + ok; + _ -> + emqx_sn_registry:unregister_topic(Registry, ClientId) + end, + emqx_channel:terminate(Reason, Channel), + ok. code_change(_Vsn, StateName, State, _Extra) -> {ok, StateName, State}. @@ -719,11 +723,13 @@ mqtt2sn(?PUBCOMP_PACKET(MsgId), _State) -> mqtt2sn(?UNSUBACK_PACKET(MsgId), _State)-> ?SN_UNSUBACK_MSG(MsgId); -mqtt2sn(?PUBLISH_PACKET(QoS, Topic, PacketId, Payload), #state{registry = Registry}) -> +mqtt2sn(?PUBLISH_PACKET(QoS, Topic, PacketId, Payload), #state{registry = Registry, + channel = Channel}) -> NewPacketId = if QoS =:= ?QOS_0 -> 0; true -> PacketId end, - {TopicIdType, TopicContent} = case emqx_sn_registry:lookup_topic_id(Registry, self(), Topic) of + ClientId = emqx_channel:info(clientid, Channel), + {TopicIdType, TopicContent} = case emqx_sn_registry:lookup_topic_id(Registry, ClientId, Topic) of {predef, PredefTopicId} -> {?SN_PREDEFINED_TOPIC, PredefTopicId}; TopicId when is_integer(TopicId) -> @@ -851,7 +857,7 @@ do_2nd_connect(Flags, Duration, ClientId, State = #state{sockname = Sockname, NChannel = case CleanStart of true -> emqx_channel:terminate(normal, Channel), - emqx_sn_registry:unregister_topic(Registry, self()), + emqx_sn_registry:unregister_topic(Registry, ClientId), emqx_channel:init(#{socktype => udp, sockname => Sockname, peername => Peername, @@ -864,8 +870,9 @@ do_2nd_connect(Flags, Duration, ClientId, State = #state{sockname = Sockname, do_connect(ClientId, CleanStart, Will, Duration, NState). handle_subscribe(?SN_NORMAL_TOPIC, TopicName, QoS, MsgId, - State=#state{registry = Registry}) -> - case emqx_sn_registry:register_topic(Registry, self(), TopicName) of + State=#state{registry = Registry, channel = Channel}) -> + ClientId = emqx_channel:info(clientid, Channel), + case emqx_sn_registry:register_topic(Registry, ClientId, TopicName) of {error, too_large} -> State0 = send_message(?SN_SUBACK_MSG(#mqtt_sn_flags{qos = QoS}, ?SN_INVALID_TOPIC_ID, @@ -879,8 +886,9 @@ handle_subscribe(?SN_NORMAL_TOPIC, TopicName, QoS, MsgId, end; handle_subscribe(?SN_PREDEFINED_TOPIC, TopicId, QoS, MsgId, - State = #state{registry = Registry}) -> - case emqx_sn_registry:lookup_topic(Registry, self(), TopicId) of + State = #state{registry = Registry, channel = Channel}) -> + ClientId = emqx_channel:info(clientid, Channel), + case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of undefined -> State0 = send_message(?SN_SUBACK_MSG(#mqtt_sn_flags{qos = QoS}, TopicId, @@ -909,8 +917,9 @@ handle_unsubscribe(?SN_NORMAL_TOPIC, TopicId, MsgId, State) -> proto_unsubscribe(TopicId, MsgId, State); handle_unsubscribe(?SN_PREDEFINED_TOPIC, TopicId, MsgId, - State = #state{registry = Registry}) -> - case emqx_sn_registry:lookup_topic(Registry, self(), TopicId) of + State = #state{registry = Registry, channel = Channel}) -> + ClientId = emqx_channel:info(clientid, Channel), + case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of undefined -> {keep_state, send_message(?SN_UNSUBACK_MSG(MsgId), State)}; PredefinedTopic -> @@ -932,10 +941,11 @@ do_publish(?SN_NORMAL_TOPIC, TopicName, Data, Flags, MsgId, State) -> <> = TopicName, do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId, State); do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId, - State=#state{registry = Registry}) -> + State=#state{registry = Registry, channel = Channel}) -> #mqtt_sn_flags{qos = QoS, dup = Dup, retain = Retain} = Flags, NewQoS = get_corrected_qos(QoS), - case emqx_sn_registry:lookup_topic(Registry, self(), TopicId) of + ClientId = emqx_channel:info(clientid, Channel), + case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of undefined -> {keep_state, maybe_send_puback(NewQoS, TopicId, MsgId, ?SN_RC_INVALID_TOPIC_ID, State)}; @@ -946,7 +956,7 @@ do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId, do_publish(?SN_SHORT_TOPIC, STopicName, Data, Flags, MsgId, State) -> #mqtt_sn_flags{qos = QoS, dup = Dup, retain = Retain} = Flags, NewQoS = get_corrected_qos(QoS), - <> = STopicName , + <> = STopicName, case emqx_topic:wildcard(STopicName) of true -> {keep_state, maybe_send_puback(NewQoS, TopicId, MsgId, ?SN_RC_NOT_SUPPORTED, @@ -974,12 +984,13 @@ do_publish_will(#state{will_msg = WillMsg, clientid = ClientId}) -> ok. do_puback(TopicId, MsgId, ReturnCode, StateName, - State=#state{registry = Registry}) -> + State=#state{registry = Registry, channel = Channel}) -> case ReturnCode of ?SN_RC_ACCEPTED -> handle_incoming(?PUBACK_PACKET(MsgId), StateName, State); ?SN_RC_INVALID_TOPIC_ID -> - case emqx_sn_registry:lookup_topic(Registry, self(), TopicId) of + ClientId = emqx_channel:info(clientid, Channel), + case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of undefined -> {keep_state, State}; TopicName -> %%notice that this TopicName maybe normal or predefined, @@ -1068,9 +1079,10 @@ handle_outgoing(Packets, State) when is_list(Packets) -> end, State, Packets); handle_outgoing(PubPkt = ?PUBLISH_PACKET(_, TopicName, _, _), - State = #state{registry = Registry}) -> + State = #state{registry = Registry, channel = Channel}) -> ?LOG(debug, "Handle outgoing publish: ~0p", [PubPkt]), - TopicId = emqx_sn_registry:lookup_topic_id(Registry, self(), TopicName), + ClientId = emqx_channel:info(clientid, Channel), + TopicId = emqx_sn_registry:lookup_topic_id(Registry, ClientId, TopicName), case (TopicId == undefined) andalso (byte_size(TopicName) =/= 2) of true -> register_and_notify_client(PubPkt, State); false -> send_message(mqtt2sn(PubPkt, State), State) @@ -1094,10 +1106,11 @@ replay_no_reg_pending_publishes(TopicId, #state{pending_topic_ids = Pendings} = State#state{pending_topic_ids = maps:remove(TopicId, Pendings)}. register_and_notify_client(?PUBLISH_PACKET(QoS, TopicName, PacketId, Payload) = PubPkt, - State = #state{registry = Registry, pending_topic_ids = Pendings}) -> + State = #state{registry = Registry, pending_topic_ids = Pendings, channel = Channel}) -> MsgId = message_id(PacketId), #mqtt_packet{header = #mqtt_packet_header{dup = Dup, retain = Retain}} = PubPkt, - TopicId = emqx_sn_registry:register_topic(Registry, self(), TopicName), + ClientId = emqx_channel:info(clientid, Channel), + TopicId = emqx_sn_registry:register_topic(Registry, ClientId, TopicName), ?LOG(debug, "Register TopicId=~p, TopicName=~p, Payload=~p, Dup=~p, QoS=~p, " "Retain=~p, MsgId=~p", [TopicId, TopicName, Payload, Dup, QoS, Retain, MsgId]), NewPendings = cache_no_reg_publish_message(Pendings, TopicId, PubPkt, State), diff --git a/apps/emqx_sn/src/emqx_sn_registry.erl b/apps/emqx_sn/src/emqx_sn_registry.erl index fa17ebc27..53ea84107 100644 --- a/apps/emqx_sn/src/emqx_sn_registry.erl +++ b/apps/emqx_sn/src/emqx_sn_registry.erl @@ -60,11 +60,11 @@ start_link(Tab, PredefTopics) -> stop({_Tab, Pid}) -> gen_server:stop(Pid, normal, infinity). --spec(register_topic(registry(), pid(), binary()) -> integer() | {error, term()}). -register_topic({_, Pid}, ClientPid, TopicName) when is_binary(TopicName) -> +-spec(register_topic(registry(), binary(), binary()) -> integer() | {error, term()}). +register_topic({_, Pid}, ClientId, TopicName) when is_binary(TopicName) -> case emqx_topic:wildcard(TopicName) of false -> - gen_server:call(Pid, {register, ClientPid, TopicName}); + gen_server:call(Pid, {register, ClientId, TopicName}); %% TopicId: in case of “accepted” the value that will be used as topic %% id by the gateway when sending PUBLISH messages to the client (not %% relevant in case of subscriptions to a short topic name or to a topic @@ -72,22 +72,22 @@ register_topic({_, Pid}, ClientPid, TopicName) when is_binary(TopicName) -> true -> {error, wildcard_topic} end. --spec(lookup_topic(registry(), pid(), pos_integer()) -> undefined | binary()). -lookup_topic({Tab, _Pid}, ClientPid, TopicId) when is_integer(TopicId) -> +-spec(lookup_topic(registry(), binary(), pos_integer()) -> undefined | binary()). +lookup_topic({Tab, _Pid}, ClientId, TopicId) when is_integer(TopicId) -> case lookup_element(Tab, {predef, TopicId}, 2) of undefined -> - lookup_element(Tab, {ClientPid, TopicId}, 2); + lookup_element(Tab, {ClientId, TopicId}, 2); Topic -> Topic end. --spec(lookup_topic_id(registry(), pid(), binary()) +-spec(lookup_topic_id(registry(), binary(), binary()) -> undefined | pos_integer() | {predef, integer()}). -lookup_topic_id({Tab, _Pid}, ClientPid, TopicName) when is_binary(TopicName) -> +lookup_topic_id({Tab, _Pid}, ClientId, TopicName) when is_binary(TopicName) -> case lookup_element(Tab, {predef, TopicName}, 2) of undefined -> - lookup_element(Tab, {ClientPid, TopicName}, 2); + lookup_element(Tab, {ClientId, TopicName}, 2); TopicId -> {predef, TopicId} end. @@ -96,17 +96,17 @@ lookup_topic_id({Tab, _Pid}, ClientPid, TopicName) when is_binary(TopicName) -> lookup_element(Tab, Key, Pos) -> try ets:lookup_element(Tab, Key, Pos) catch error:badarg -> undefined end. --spec(unregister_topic(registry(), pid()) -> ok). -unregister_topic({_Tab, Pid}, ClientPid) -> - gen_server:call(Pid, {unregister, ClientPid}). +-spec(unregister_topic(registry(), binary()) -> ok). +unregister_topic({_Tab, Pid}, ClientId) -> + gen_server:call(Pid, {unregister, ClientId}). %%----------------------------------------------------------------------------- init([Tab, PredefTopics]) -> %% {predef, TopicId} -> TopicName %% {predef, TopicName} -> TopicId - %% {ClientPid, TopicId} -> TopicName - %% {ClientPid, TopicName} -> TopicId + %% {ClientId, TopicId} -> TopicName + %% {ClientId, TopicName} -> TopicId _ = ets:new(Tab, [set, public, named_table, {read_concurrency, true}]), MaxPredefId = lists:foldl( fun({TopicId, TopicName}, AccId) -> @@ -116,27 +116,27 @@ init([Tab, PredefTopics]) -> end, 0, PredefTopics), {ok, #state{tab = Tab, max_predef_topic_id = MaxPredefId}}. -handle_call({register, ClientPid, TopicName}, _From, +handle_call({register, ClientId, TopicName}, _From, State = #state{tab = Tab, max_predef_topic_id = PredefId}) -> - case lookup_topic_id({Tab, self()}, ClientPid, TopicName) of + case lookup_topic_id({Tab, self()}, ClientId, TopicName) of {predef, PredefTopicId} when is_integer(PredefTopicId) -> {reply, PredefTopicId, State}; TopicId when is_integer(TopicId) -> {reply, TopicId, State}; undefined -> - case next_topic_id(Tab, PredefId, ClientPid) of + case next_topic_id(Tab, PredefId, ClientId) of TopicId when TopicId >= 16#FFFF -> {reply, {error, too_large}, State}; TopicId -> - _ = ets:insert(Tab, {{ClientPid, next_topic_id}, TopicId + 1}), - _ = ets:insert(Tab, {{ClientPid, TopicName}, TopicId}), - _ = ets:insert(Tab, {{ClientPid, TopicId}, TopicName}), + _ = ets:insert(Tab, {{ClientId, next_topic_id}, TopicId + 1}), + _ = ets:insert(Tab, {{ClientId, TopicName}, TopicId}), + _ = ets:insert(Tab, {{ClientId, TopicId}, TopicName}), {reply, TopicId, State} end end; -handle_call({unregister, ClientPid}, _From, State = #state{tab = Tab}) -> - ets:match_delete(Tab, {{ClientPid, '_'}, '_'}), +handle_call({unregister, ClientId}, _From, State = #state{tab = Tab}) -> + ets:match_delete(Tab, {{ClientId, '_'}, '_'}), {reply, ok, State}; handle_call(Req, _From, State) -> @@ -159,8 +159,8 @@ code_change(_OldVsn, State, _Extra) -> %%----------------------------------------------------------------------------- -next_topic_id(Tab, PredefId, ClientPid) -> - case ets:lookup(Tab, {ClientPid, next_topic_id}) of +next_topic_id(Tab, PredefId, ClientId) -> + case ets:lookup(Tab, {ClientId, next_topic_id}) of [{_, Id}] -> Id; [] -> PredefId + 1 end.