diff --git a/apps/emqx_gateway/src/emqx_gateway_cm.erl b/apps/emqx_gateway/src/emqx_gateway_cm.erl index 814e37163..4c07d3938 100644 --- a/apps/emqx_gateway/src/emqx_gateway_cm.erl +++ b/apps/emqx_gateway/src/emqx_gateway_cm.erl @@ -389,7 +389,7 @@ open_session( end, case takeover_session(GwName, ClientId) of {ok, ConnMod, ChanPid, Session} -> - ok = emqx_session:resume(ClientInfo, Session), + ok = SessionMod:resume(ClientInfo, Session), case request_stepdown({takeover, 'end'}, ConnMod, ChanPid) of {ok, Pendings} -> register_channel( diff --git a/apps/emqx_gateway_mqttsn/include/emqx_mqttsn.hrl b/apps/emqx_gateway_mqttsn/include/emqx_mqttsn.hrl index 5ab2d4a05..2b63e8c12 100644 --- a/apps/emqx_gateway_mqttsn/include/emqx_mqttsn.hrl +++ b/apps/emqx_gateway_mqttsn/include/emqx_mqttsn.hrl @@ -14,6 +14,8 @@ %% limitations under the License. %%-------------------------------------------------------------------- +-define(SN_MAX_PREDEF_TOPIC_ID, 1024). + %%-------------------------------------------------------------------- %% MQTT-SN Types %%-------------------------------------------------------------------- diff --git a/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.erl b/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.erl index 167ee465c..23f32497b 100644 --- a/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.erl +++ b/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.erl @@ -56,8 +56,7 @@ on_gateway_load( }, Ctx ) -> - %% We Also need to start `emqx_mqttsn_broadcast` & - %% `emqx_mqttsn_registry` process + %% We Also need to start `emqx_mqttsn_broadcast` case maps:get(broadcast, Config, false) of false -> ok; @@ -70,12 +69,9 @@ on_gateway_load( end, PredefTopics = maps:get(predefined, Config, []), - {ok, RegistrySvr} = emqx_mqttsn_registry:start_link(GwName, PredefTopics), + ok = emqx_mqttsn_registry:persist_predefined_topics(PredefTopics), - NConfig = maps:without( - [broadcast, predefined], - Config#{registry => emqx_mqttsn_registry:lookup_name(RegistrySvr)} - ), + NConfig = maps:without([broadcast, predefined], Config), Listeners = emqx_gateway_utils:normalize_config(NConfig), @@ -125,5 +121,7 @@ on_gateway_unload( }, _GwState ) -> + PredefTopics = maps:get(predefined, Config, []), + ok = emqx_mqttsn_registry:clear_predefined_topics(PredefTopics), Listeners = normalize_config(Config), stop_listeners(GwName, Listeners). diff --git a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl index b03d878b7..720c288d3 100644 --- a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl +++ b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl @@ -51,8 +51,6 @@ -record(channel, { %% Context ctx :: emqx_gateway_ctx:context(), - %% Registry - registry :: emqx_mqttsn_registry:registry(), %% Gateway Id gateway_id :: integer(), %% Enable negative_qos @@ -62,7 +60,7 @@ %% MQTT-SN Client Info clientinfo :: emqx_types:clientinfo(), %% Session - session :: emqx_session:session() | undefined, + session :: emqx_mqttsn_session:session() | undefined, %% Keepalive keepalive :: emqx_keepalive:keepalive() | undefined, %% Will Msg @@ -147,7 +145,6 @@ init( ) -> Peercert = maps:get(peercert, ConnInfo, undefined), Mountpoint = maps:get(mountpoint, Option, undefined), - Registry = maps:get(registry, Option), GwId = maps:get(gateway_id, Option), EnableNegQoS = maps:get(enable_qos3, Option, true), ListenerId = @@ -180,7 +177,6 @@ init( ), #channel{ ctx = Ctx, - registry = Registry, gateway_id = GwId, enable_negative_qos = EnableNegQoS, conninfo = ConnInfo, @@ -217,7 +213,7 @@ info(conn_state, #channel{conn_state = ConnState}) -> info(clientinfo, #channel{clientinfo = ClientInfo}) -> ClientInfo; info(session, #channel{session = Session}) -> - emqx_utils:maybe_apply(fun emqx_session:info/1, Session); + emqx_utils:maybe_apply(fun emqx_mqttsn_session:info/1, Session); info(will_msg, #channel{will_msg = WillMsg}) -> WillMsg; info(clientid, #channel{clientinfo = #{clientid := ClientId}}) -> @@ -229,7 +225,7 @@ info(ctx, #channel{ctx = Ctx}) -> stats(#channel{session = undefined}) -> []; stats(#channel{session = Session}) -> - emqx_session:stats(Session). + emqx_mqttsn_session:stats(Session). set_conn_state(ConnState, Channel) -> Channel#channel{conn_state = ConnState}. @@ -388,19 +384,15 @@ process_connect( clientinfo = ClientInfo } ) -> - SessFun = fun(ClientInfoT, _) -> - Conf = emqx_cm:get_session_confs( - ClientInfoT, #{receive_maximum => 1, expiry_interval => 0} - ), - emqx_session:init(Conf) - end, + SessFun = fun(ClientInfoT, _) -> emqx_mqttsn_session:init(ClientInfoT) end, case emqx_gateway_ctx:open_session( Ctx, CleanStart, ClientInfo, ConnInfo, - SessFun + SessFun, + _SessMod = emqx_mqttsn_session ) of {ok, #{ @@ -470,7 +462,7 @@ handle_in( MsgId, Data ), - Channel = #channel{conn_state = idle, registry = Registry} + Channel = #channel{conn_state = idle} ) -> case check_negative_qos_enable(Publish, Channel) of ok -> @@ -479,11 +471,8 @@ handle_in( ?SN_SHORT_TOPIC -> TopicId; ?SN_PREDEFINED_TOPIC -> - emqx_mqttsn_registry:lookup_topic( - Registry, - ?NEG_QOS_CLIENT_ID, - TopicId - ); + Registry = emqx_mqttsn_registry:init(), + emqx_mqttsn_registry:lookup_topic(TopicId, Registry); _ -> undefined end, @@ -631,20 +620,19 @@ handle_in( end; handle_in( ?SN_REGISTER_MSG(_TopicId, MsgId, TopicName), - Channel = #channel{ - registry = Registry, - clientinfo = #{clientid := ClientId} - } + Channel = #channel{session = Session} ) -> - case emqx_mqttsn_registry:register_topic(Registry, ClientId, TopicName) of - TopicId when is_integer(TopicId) -> + Registry = emqx_mqttsn_session:registry(Session), + case emqx_mqttsn_registry:reg(TopicName, Registry) of + {ok, TopicId, NRegistry} -> ?SLOG(debug, #{ msg => "registered_topic_name", topic_name => TopicName, topic_id => TopicId }), AckPacket = ?SN_REGACK_MSG(TopicId, MsgId, ?SN_RC_ACCEPTED), - {ok, {outgoing, AckPacket}, Channel}; + NSession = emqx_mqttsn_session:set_registry(NRegistry, Session), + {ok, {outgoing, AckPacket}, Channel#channel{session = NSession}}; {error, too_large} -> ?SLOG(error, #{ msg => "register_topic_failed", @@ -756,14 +744,14 @@ handle_in( ?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode), Channel = #channel{ ctx = Ctx, - registry = Registry, session = Session, - clientinfo = ClientInfo = #{clientid := ClientId} + clientinfo = ClientInfo } ) -> + Registry = emqx_mqttsn_session:registry(Session), case ReturnCode of ?SN_RC_ACCEPTED -> - case emqx_session:puback(ClientInfo, MsgId, Session) of + case emqx_mqttsn_session:puback(ClientInfo, MsgId, Session) of {ok, Msg, NSession} -> ok = after_message_acked(ClientInfo, Msg, Channel), {Replies, NChannel} = goto_asleep_if_buffered_msgs_sent( @@ -795,7 +783,7 @@ handle_in( {ok, Channel} end; ?SN_RC_INVALID_TOPIC_ID -> - case emqx_mqttsn_registry:lookup_topic(Registry, ClientId, TopicId) of + case emqx_mqttsn_registry:lookup_topic(TopicId, Registry) of undefined -> {ok, Channel}; TopicName -> @@ -820,7 +808,7 @@ handle_in( clientinfo = ClientInfo } ) -> - case emqx_session:pubrec(ClientInfo, MsgId, Session) of + case emqx_mqttsn_session:pubrec(ClientInfo, MsgId, Session) of {ok, Msg, NSession} -> ok = after_message_acked(ClientInfo, Msg, Channel), NChannel = Channel#channel{session = NSession}, @@ -846,7 +834,7 @@ handle_in( ?SN_PUBREC_MSG(?SN_PUBREL, MsgId), Channel = #channel{ctx = Ctx, session = Session, clientinfo = ClientInfo} ) -> - case emqx_session:pubrel(ClientInfo, MsgId, Session) of + case emqx_mqttsn_session:pubrel(ClientInfo, MsgId, Session) of {ok, NSession} -> NChannel = Channel#channel{session = NSession}, handle_out(pubcomp, MsgId, NChannel); @@ -863,7 +851,7 @@ handle_in( ?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId), Channel = #channel{ctx = Ctx, session = Session, clientinfo = ClientInfo} ) -> - case emqx_session:pubcomp(ClientInfo, MsgId, Session) of + case emqx_mqttsn_session:pubcomp(ClientInfo, MsgId, Session) of {ok, NSession} -> {Replies, NChannel} = goto_asleep_if_buffered_msgs_sent( Channel#channel{session = NSession} @@ -1100,12 +1088,10 @@ convert_topic_id_to_name({{name, TopicName}, Flags, Data}, Channel) -> {ok, {TopicName, Flags, Data}, Channel}; convert_topic_id_to_name( {{id, TopicId}, Flags, Data}, - Channel = #channel{ - registry = Registry, - clientinfo = #{clientid := ClientId} - } + Channel = #channel{session = Session} ) -> - case emqx_mqttsn_registry:lookup_topic(Registry, ClientId, TopicId) of + Registry = emqx_mqttsn_session:registry(Session), + case emqx_mqttsn_registry:lookup_topic(TopicId, Registry) of undefined -> {error, ?SN_RC_INVALID_TOPIC_ID}; TopicName -> @@ -1174,7 +1160,7 @@ do_publish( Msg = #message{qos = ?QOS_2}, Channel = #channel{ctx = Ctx, session = Session, clientinfo = ClientInfo} ) -> - case emqx_session:publish(ClientInfo, MsgId, Msg, Session) of + case emqx_mqttsn_session:publish(ClientInfo, MsgId, Msg, Session) of {ok, _PubRes, NSession} -> NChannel1 = ensure_timer( await_timer, @@ -1207,15 +1193,13 @@ preproc_subs_type( TopicName, QoS ), - Channel = #channel{ - registry = Registry, - clientinfo = #{clientid := ClientId} - } + Channel = #channel{session = Session} ) -> + Registry = emqx_mqttsn_session:registry(Session), %% If the gateway is able accept the subscription, %% it assigns a topic id to the received topic name %% and returns it within a SUBACK message - case emqx_mqttsn_registry:register_topic(Registry, ClientId, TopicName) of + case emqx_mqttsn_registry:reg(TopicName, Registry) of {error, too_large} -> {error, ?SN_RC2_EXCEED_LIMITATION}; {error, wildcard_topic} -> @@ -1226,8 +1210,9 @@ preproc_subs_type( %% value when it has the first PUBLISH message with a matching %% topic name to be sent to the client, see also Section 6.10. {ok, {?SN_INVALID_TOPIC_ID, TopicName, QoS}, Channel}; - TopicId when is_integer(TopicId) -> - {ok, {TopicId, TopicName, QoS}, Channel} + {ok, TopicId, NRegistry} -> + NSession = emqx_mqttsn_session:set_registry(NRegistry, Session), + {ok, {TopicId, TopicName, QoS}, Channel#channel{session = NSession}} end; preproc_subs_type( ?SN_SUBSCRIBE_MSG_TYPE( @@ -1235,18 +1220,10 @@ preproc_subs_type( TopicId, QoS ), - Channel = #channel{ - registry = Registry, - clientinfo = #{clientid := ClientId} - } + Channel = #channel{session = Session} ) -> - case - emqx_mqttsn_registry:lookup_topic( - Registry, - ClientId, - TopicId - ) - of + Registry = emqx_mqttsn_session:registry(Session), + case emqx_mqttsn_registry:lookup_topic(TopicId, Registry) of undefined -> {error, ?SN_RC_INVALID_TOPIC_ID}; TopicName -> @@ -1323,7 +1300,7 @@ do_subscribe( ) -> NTopicName = emqx_mountpoint:mount(Mountpoint, TopicName), NSubOpts = maps:merge(emqx_gateway_utils:default_subopts(), SubOpts), - case emqx_session:subscribe(ClientInfo, NTopicName, NSubOpts, Session) of + case emqx_mqttsn_session:subscribe(ClientInfo, NTopicName, NSubOpts, Session) of {ok, NSession} -> {ok, {TopicId, NTopicName, NSubOpts}, Channel#channel{session = NSession}}; {error, ?RC_QUOTA_EXCEEDED} -> @@ -1351,18 +1328,10 @@ preproc_unsub_type( ?SN_PREDEFINED_TOPIC, TopicId ), - Channel = #channel{ - registry = Registry, - clientinfo = #{clientid := ClientId} - } + Channel = #channel{session = Session} ) -> - case - emqx_mqttsn_registry:lookup_topic( - Registry, - ClientId, - TopicId - ) - of + Registry = emqx_mqttsn_session:registry(Session), + case emqx_mqttsn_registry:lookup_topic(TopicId, Registry) of undefined -> {error, not_found}; TopicName -> @@ -1422,7 +1391,7 @@ do_unsubscribe( SubOpts ), case - emqx_session:unsubscribe( + emqx_mqttsn_session:unsubscribe( ClientInfo, NTopicName, NSubOpts, @@ -1467,9 +1436,9 @@ awake( clientid => ClientId, previous_state => ConnState }), - {ok, Publishes, Session1} = emqx_session:replay(ClientInfo, Session), + {ok, Publishes, Session1} = emqx_mqttsn_session:replay(ClientInfo, Session), {NPublishes, NSession} = - case emqx_session:deliver(ClientInfo, [], Session1) of + case emqx_mqttsn_session:deliver(ClientInfo, [], Session1) of {ok, Session2} -> {Publishes, Session2}; {ok, More, Session2} -> @@ -1497,8 +1466,8 @@ goto_asleep_if_buffered_msgs_sent( } ) -> case - emqx_mqueue:is_empty(emqx_session:info(mqueue, Session)) andalso - emqx_inflight:is_empty(emqx_session:info(inflight, Session)) + emqx_mqueue:is_empty(emqx_mqttsn_session:info(mqueue, Session)) andalso + emqx_inflight:is_empty(emqx_mqttsn_session:info(inflight, Session)) of true -> ?SLOG(info, #{ @@ -1591,7 +1560,7 @@ handle_out( register_inflight = undefined } ) -> - {MsgId, NSession} = emqx_session:obtain_next_pkt_id(Session), + {MsgId, NSession} = emqx_mqttsn_session:obtain_next_pkt_id(Session), Outgoing = {outgoing, ?SN_REGISTER_MSG(TopicId, MsgId, TopicName)}, NChannel = Channel#channel{ session = NSession, @@ -1667,7 +1636,7 @@ maybe_resume_session( resuming = true } ) -> - Subs = emqx_session:info(subscriptions, Session), + Subs = emqx_mqttsn_session:info(subscriptions, Session), case subs_resume() andalso map_size(Subs) =/= 0 of true -> TopicNames = lists:filter(fun(T) -> not emqx_topic:wildcard(T) end, maps:keys(Subs)), @@ -1692,9 +1661,9 @@ resume_or_replay_messages( false -> {[], Channel} end, - {ok, Publishes, Session1} = emqx_session:replay(ClientInfo, Session), + {ok, Publishes, Session1} = emqx_mqttsn_session:replay(ClientInfo, Session), {NPublishes, NSession} = - case emqx_session:deliver(ClientInfo, NPendings, Session1) of + case emqx_mqttsn_session:deliver(ClientInfo, NPendings, Session1) of {ok, Session2} -> {Publishes, Session2}; {ok, More, Session2} -> @@ -1765,10 +1734,7 @@ outgoing_deliver_and_register({Packets, Channel}) -> message_to_packet( MsgId, Message, - #channel{ - registry = Registry, - clientinfo = #{clientid := ClientId} - } + #channel{session = Session} ) -> QoS = emqx_message:qos(Message), Topic = emqx_message:topic(Message), @@ -1778,7 +1744,8 @@ message_to_packet( ?QOS_0 -> 0; _ -> MsgId end, - case emqx_mqttsn_registry:lookup_topic_id(Registry, ClientId, Topic) of + Registry = emqx_mqttsn_session:registry(Session), + case emqx_mqttsn_registry:lookup_topic_id(Topic, Registry) of {predef, PredefTopicId} -> Flags = #mqtt_sn_flags{qos = QoS, topic_id_type = ?SN_PREDEFINED_TOPIC}, ?SN_PUBLISH_MSG(Flags, PredefTopicId, NMsgId, Payload); @@ -1813,7 +1780,7 @@ handle_call({unsubscribe, Topic}, _From, Channel) -> {ok, _, NChannel} = do_unsubscribe(TopicFilters, Channel), reply_and_update(ok, NChannel); handle_call(subscriptions, _From, Channel = #channel{session = Session}) -> - reply({ok, maps:to_list(emqx_session:info(subscriptions, Session))}, Channel); + reply({ok, maps:to_list(emqx_mqttsn_session:info(subscriptions, Session))}, Channel); handle_call(kick, _From, Channel) -> NChannel = ensure_disconnected(kicked, Channel), shutdown_and_reply(kicked, ok, NChannel); @@ -1834,7 +1801,7 @@ handle_call( pendings = Pendings } ) -> - ok = emqx_session:takeover(Session), + ok = emqx_mqttsn_session:takeover(Session), %% TODO: Should not drain deliver here (side effect) Delivers = emqx_utils:drain_deliver(), AllPendings = lists:append(Delivers, Pendings), @@ -1911,8 +1878,9 @@ handle_info(clean_authz_cache, Channel) -> {ok, Channel}; handle_info({subscribe, _}, Channel) -> {ok, Channel}; -handle_info({register, TopicName}, Channel) -> - case ensure_registered_topic_name(TopicName, Channel) of +handle_info({register, TopicName}, Channel = #channel{session = Session}) -> + Registry = emqx_mqttsn_session:registry(Session), + case emqx_mqttsn_registry:reg(TopicName, Registry) of {error, Reason} -> ?SLOG(error, #{ msg => "register_topic_failed", @@ -1920,8 +1888,9 @@ handle_info({register, TopicName}, Channel) -> reason => Reason }), {ok, Channel}; - {ok, TopicId} -> - handle_out(register, {TopicId, TopicName}, Channel) + {ok, TopicId, NRegistry} -> + NSession = emqx_mqttsn_session:set_registry(NRegistry, Session), + handle_out(register, {TopicId, TopicName}, Channel#channel{session = NSession}) end; handle_info(Info, Channel) -> ?SLOG(error, #{ @@ -1940,21 +1909,6 @@ maybe_shutdown(Reason, Channel = #channel{conninfo = ConnInfo}) -> shutdown(Reason, Channel) end. -ensure_registered_topic_name( - TopicName, - Channel = #channel{registry = Registry} -) -> - ClientId = clientid(Channel), - case emqx_mqttsn_registry:lookup_topic_id(Registry, ClientId, TopicName) of - undefined -> - case emqx_mqttsn_registry:register_topic(Registry, ClientId, TopicName) of - {error, Reason} -> {error, Reason}; - TopicId -> {ok, TopicId} - end; - TopicId -> - {ok, TopicId} - end. - %%-------------------------------------------------------------------- %% Ensure disconnected @@ -2003,7 +1957,7 @@ handle_deliver( ConnState =:= disconnected; ConnState =:= asleep -> - NSession = emqx_session:enqueue( + NSession = emqx_mqttsn_session:enqueue( ClientInfo, ignore_local(maybe_nack(Delivers), ClientId, Session, Ctx), Session @@ -2039,7 +1993,7 @@ handle_deliver( } ) -> case - emqx_session:deliver( + emqx_mqttsn_session:deliver( ClientInfo, ignore_local(Delivers, ClientId, Session, Ctx), Session @@ -2057,7 +2011,7 @@ handle_deliver( end. ignore_local(Delivers, Subscriber, Session, Ctx) -> - Subs = emqx_session:info(subscriptions, Session), + Subs = emqx_mqttsn_session:info(subscriptions, Session), lists:filter( fun({deliver, Topic, #message{from = Publisher}}) -> case maps:find(Topic, Subs) of @@ -2132,7 +2086,7 @@ handle_timeout( retry_delivery, Channel = #channel{session = Session, clientinfo = ClientInfo} ) -> - case emqx_session:retry(ClientInfo, Session) of + case emqx_mqttsn_session:retry(ClientInfo, Session) of {ok, NSession} -> {ok, clean_timer(retry_timer, Channel#channel{session = NSession})}; {ok, Publishes, Timeout, NSession} -> @@ -2157,7 +2111,7 @@ handle_timeout( expire_awaiting_rel, Channel = #channel{session = Session, clientinfo = ClientInfo} ) -> - case emqx_session:expire(ClientInfo, awaiting_rel, Session) of + case emqx_mqttsn_session:expire(ClientInfo, awaiting_rel, Session) of {ok, NSession} -> {ok, clean_timer(await_timer, Channel#channel{session = NSession})}; {ok, Timeout, NSession} -> @@ -2301,17 +2255,14 @@ clean_timer(Name, Channel = #channel{timers = Timers}) -> interval(alive_timer, #channel{keepalive = KeepAlive}) -> emqx_keepalive:info(interval, KeepAlive); interval(retry_timer, #channel{session = Session}) -> - emqx_session:info(retry_interval, Session); + emqx_mqttsn_session:info(retry_interval, Session); interval(await_timer, #channel{session = Session}) -> - emqx_session:info(await_rel_timeout, Session). + emqx_mqttsn_session:info(await_rel_timeout, Session). %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- -clientid(#channel{clientinfo = #{clientid := ClientId}}) -> - ClientId. - run_hooks(Ctx, Name, Args) -> emqx_gateway_ctx:metrics_inc(Ctx, Name), emqx_hooks:run(Name, Args). diff --git a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_registry.erl b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_registry.erl index 9db355a9b..3113fc43d 100644 --- a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_registry.erl +++ b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_registry.erl @@ -17,64 +17,92 @@ %% @doc The MQTT-SN Topic Registry -module(emqx_mqttsn_registry). --behaviour(gen_server). - -include("emqx_mqttsn.hrl"). --include_lib("emqx/include/logger.hrl"). - --export([start_link/2]). -export([ - register_topic/3, - unregister_topic/2 + persist_predefined_topics/1, + clear_predefined_topics/1 ]). -export([ - lookup_topic/3, - lookup_topic_id/3 + init/0, + reg/2, + unreg/2, + lookup_topic/2, + lookup_topic_id/2 ]). -%% gen_server callbacks --export([ - init/1, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2, - code_change/3 -]). +-define(PKEY(Id), {mqttsn, predef_topics, Id}). -%% Internal exports (RPC) --export([ - do_register/4 -]). +-type registry() :: #{ + %% The last topic id aallocated + last_topic_id := pos_integer(), + %% The mapping from topic id to topic name + id_to_name := map(), + %% The mapping from topic name to topic id + name_to_id := map() +}. --export([lookup_name/1]). - --define(SN_SHARD, emqx_mqttsn_shard). - --record(state, {tabname, max_predef_topic_id = 0}). - --record(emqx_mqttsn_registry, {key, value}). - --type registry() :: {Tab :: atom(), RegistryPid :: pid()}. +-type predef_topic() :: #{ + id := 1..1024, + topic := iolist() +}. %%----------------------------------------------------------------------------- +%% APIs --spec start_link(atom(), list()) -> - ignore - | {ok, pid()} - | {error, Reason :: term()}. -start_link(InstaId, PredefTopics) -> - gen_server:start_link(?MODULE, [InstaId, PredefTopics], []). +-spec persist_predefined_topics([predef_topic()]) -> ok. +persist_predefined_topics(PredefTopics) when is_list(PredefTopics) -> + try + F = fun(#{id := TopicId, topic := TopicName0}) when TopicId =< 1024 -> + TopicName = iolist_to_binary(TopicName0), + persistent_term:put(?PKEY(TopicId), TopicName), + persistent_term:put(?PKEY(TopicName), TopicId) + end, + lists:foreach(F, PredefTopics) + catch + _:_ -> + clear_predefined_topics(PredefTopics), + error(badarg) + end. --spec register_topic(registry(), emqx_types:clientid(), emqx_types:topic()) -> - integer() +-spec clear_predefined_topics([predef_topic()]) -> ok. +clear_predefined_topics(PredefTopics) -> + lists:foreach( + fun(#{id := TopicId, topic := TopicName0}) -> + TopicName = iolist_to_binary(TopicName0), + persistent_term:erase(?PKEY(TopicId)), + persistent_term:erase(?PKEY(TopicName)) + end, + PredefTopics + ), + ok. + +-spec init() -> registry(). +init() -> + #{ + last_topic_id => ?SN_MAX_PREDEF_TOPIC_ID, + id_to_name => #{}, + name_to_id => #{} + }. + +-spec reg(emqx_types:topic(), registry()) -> + {ok, integer(), registry()} | {error, term()}. -register_topic({_, Pid}, ClientId, TopicName) when is_binary(TopicName) -> +reg( + TopicName, + Registry +) when is_binary(TopicName) -> case emqx_topic:wildcard(TopicName) of false -> - gen_server:call(Pid, {register, ClientId, TopicName}); + case lookup_topic_id(TopicName, Registry) of + {predef, TopicId} when is_integer(TopicId) -> + {ok, TopicId, Registry}; + TopicId when is_integer(TopicId) -> + {ok, TopicId, Registry}; + undefined -> + do_reg(TopicName, Registry) + end; %% TopicId: in case of “accepted” the value that will be used as topic %% id by the gateway when sending PUBLISH messages to the client (not %% relevant in case of subscriptions to a short topic name or to a topic @@ -83,182 +111,64 @@ register_topic({_, Pid}, ClientId, TopicName) when is_binary(TopicName) -> {error, wildcard_topic} end. --spec lookup_topic(registry(), emqx_types:clientid(), pos_integer()) -> +do_reg( + TopicName, + Registry = #{ + last_topic_id := TopicId0, + id_to_name := IdMap, + name_to_id := NameMap + } +) -> + case next_topic_id(TopicId0) of + {error, too_large} -> + {error, too_large}; + NextTopicId -> + NRegistry = Registry#{ + last_topic_id := NextTopicId, + id_to_name := maps:put(NextTopicId, TopicName, IdMap), + name_to_id := maps:put(TopicName, NextTopicId, NameMap) + }, + {ok, NextTopicId, NRegistry} + end. + +next_topic_id(Id) when is_integer(Id) andalso (Id < 16#FFFF) -> + Id + 1; +next_topic_id(Id) when is_integer(Id) -> + {error, too_large}. + +-spec lookup_topic(pos_integer(), registry()) -> undefined | binary(). -lookup_topic({Tab, _}, ClientId, TopicId) when is_integer(TopicId) -> - case lookup_element(Tab, {predef, TopicId}, 3) of +lookup_topic(TopicId, _Registry = #{id_to_name := IdMap}) when is_integer(TopicId) -> + case persistent_term:get(?PKEY(TopicId), undefined) of undefined -> - lookup_element(Tab, {ClientId, TopicId}, 3); + maps:get(TopicId, IdMap, undefined); Topic -> Topic end. --spec lookup_topic_id(registry(), emqx_types:clientid(), emqx_types:topic()) -> +-spec lookup_topic_id(emqx_types:topic(), registry()) -> undefined | pos_integer() | {predef, integer()}. -lookup_topic_id({Tab, _}, ClientId, TopicName) when is_binary(TopicName) -> - case lookup_element(Tab, {predef, TopicName}, 3) of +lookup_topic_id(TopicName, _Registry = #{name_to_id := NameMap}) when is_binary(TopicName) -> + case persistent_term:get(?PKEY(TopicName), undefined) of undefined -> - lookup_element(Tab, {ClientId, TopicName}, 3); + maps:get(TopicName, NameMap, undefined); TopicId -> {predef, TopicId} end. -%% @private -lookup_element(Tab, Key, Pos) -> - try - ets:lookup_element(Tab, Key, Pos) - catch - error:badarg -> undefined - end. - --spec unregister_topic(registry(), emqx_types:clientid()) -> ok. -unregister_topic({_, Pid}, ClientId) -> - gen_server:call(Pid, {unregister, ClientId}). - -lookup_name(Pid) -> - gen_server:call(Pid, name). - -%%----------------------------------------------------------------------------- - -name(InstaId) -> - list_to_atom(lists:concat([emqx_mqttsn_, InstaId, '_registry'])). - -init([InstaId, PredefTopics]) -> - %% {predef, TopicId} -> TopicName - %% {predef, TopicName} -> TopicId - %% {ClientId, TopicId} -> TopicName - %% {ClientId, TopicName} -> TopicId - Tab = name(InstaId), - ok = mria:create_table(Tab, [ - {storage, ram_copies}, - {record_name, emqx_mqttsn_registry}, - {attributes, record_info(fields, emqx_mqttsn_registry)}, - {storage_properties, [{ets, [{read_concurrency, true}]}]}, - {rlog_shard, ?SN_SHARD} - ]), - ok = mria:wait_for_tables([Tab]), - MaxPredefId = lists:foldl( - fun(#{id := TopicId, topic := TopicName0}, AccId) -> - TopicName = iolist_to_binary(TopicName0), - mria:dirty_write(Tab, #emqx_mqttsn_registry{ - key = {predef, TopicId}, - value = TopicName - }), - mria:dirty_write(Tab, #emqx_mqttsn_registry{ - key = {predef, TopicName}, - value = TopicId - }), - case TopicId > AccId of - true -> TopicId; - false -> AccId - end - end, - 0, - PredefTopics - ), - {ok, #state{tabname = Tab, max_predef_topic_id = MaxPredefId}}. - -handle_call( - {register, ClientId, TopicName}, - _From, - State = #state{tabname = Tab, max_predef_topic_id = PredefId} -) -> - case lookup_topic_id({Tab, self()}, ClientId, TopicName) of - {predef, PredefTopicId} when is_integer(PredefTopicId) -> - {reply, PredefTopicId, State}; - TopicId when is_integer(TopicId) -> - {reply, TopicId, State}; - undefined -> - case next_topic_id(Tab, PredefId, ClientId) of - TopicId when TopicId >= 16#FFFF -> - {reply, {error, too_large}, State}; - TopicId -> - case - mria:transaction(?SN_SHARD, fun ?MODULE:do_register/4, [ - Tab, ClientId, TopicId, TopicName - ]) - of - {atomic, ok} -> - {reply, TopicId, State}; - {aborted, Error} -> - {reply, {error, Error}, State} - end - end - end; -handle_call({unregister, ClientId}, _From, State = #state{tabname = Tab}) -> - Registry = mnesia:dirty_match_object( - Tab, - {emqx_mqttsn_registry, {ClientId, '_'}, '_'} - ), - lists:foreach( - fun(R) -> - mria:dirty_delete_object(Tab, R) - end, - Registry - ), - {reply, ok, State}; -handle_call(name, _From, State = #state{tabname = Tab}) -> - {reply, {Tab, self()}, State}; -handle_call(Req, _From, State) -> - ?SLOG(error, #{ - msg => "unexpected_call", - call => Req - }), - {reply, ignored, State}. - -handle_cast(Msg, State) -> - ?SLOG(error, #{ - msg => "unexpected_cast", - cast => Msg - }), - {noreply, State}. - -handle_info(Info, State) -> - ?SLOG(error, #{ - msg => "unexpected_info", - info => Info - }), - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -do_register(Tab, ClientId, TopicId, TopicName) -> - mnesia:write( - Tab, - #emqx_mqttsn_registry{ - key = {ClientId, next_topic_id}, - value = TopicId + 1 - }, - write - ), - mnesia:write( - Tab, - #emqx_mqttsn_registry{ - key = {ClientId, TopicName}, - value = TopicId - }, - write - ), - mnesia:write( - Tab, - #emqx_mqttsn_registry{ - key = {ClientId, TopicId}, - value = TopicName - }, - write - ). - -%%----------------------------------------------------------------------------- - -next_topic_id(Tab, PredefId, ClientId) -> - case mnesia:dirty_read(Tab, {ClientId, next_topic_id}) of - [#emqx_mqttsn_registry{value = Id}] -> Id; - [] -> PredefId + 1 +-spec unreg(emqx_types:topic(), registry()) -> registry(). +unreg(TopicName, Registry = #{name_to_id := NameMap, id_to_name := IdMap}) when + is_binary(TopicName) +-> + case maps:find(TopicName, NameMap) of + {ok, TopicId} -> + Registry#{ + name_to_id := maps:remove(TopicName, NameMap), + id_to_name := maps:remove(TopicId, IdMap) + }; + error -> + Registry end. diff --git a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_schema.erl b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_schema.erl index 8adf7a934..08fb854b4 100644 --- a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_schema.erl +++ b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_schema.erl @@ -16,6 +16,7 @@ -module(emqx_mqttsn_schema). +-include("emqx_mqttsn.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -include_lib("typerefl/include/types.hrl"). @@ -72,7 +73,7 @@ fields(mqttsn) -> fields(mqttsn_predefined) -> [ {id, - sc(integer(), #{ + sc(range(1, ?SN_MAX_PREDEF_TOPIC_ID), #{ required => true, desc => ?DESC(mqttsn_predefined_id) })}, diff --git a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_session.erl b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_session.erl new file mode 100644 index 000000000..7c62800cc --- /dev/null +++ b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_session.erl @@ -0,0 +1,144 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_mqttsn_session). + +-export([registry/1, set_registry/2]). + +-export([ + init/1, + info/1, + info/2, + stats/1, + resume/2 +]). + +-export([ + publish/4, + subscribe/4, + unsubscribe/4, + puback/3, + pubrec/3, + pubrel/3, + pubcomp/3 +]). + +-export([ + replay/2, + deliver/3, + obtain_next_pkt_id/1, + takeover/1, + enqueue/3, + retry/2, + expire/3 +]). + +-type session() :: #{ + registry := emqx_mqttsn_registry:registry(), + session := emqx_session:session() +}. + +-export_type([session/0]). + +init(ClientInfo) -> + Conf = emqx_cm:get_session_confs( + ClientInfo, #{receive_maximum => 1, expiry_interval => 0} + ), + #{ + registry => emqx_mqttsn_registry:init(), + session => emqx_session:init(Conf) + }. + +registry(#{registry := Registry}) -> + Registry. + +set_registry(Registry, Session) -> + Session#{registry := Registry}. + +info(#{session := Session}) -> + emqx_session:info(Session). + +info(Key, #{session := Session}) -> + emqx_session:info(Key, Session). + +stats(#{session := Session}) -> + emqx_session:stats(Session). + +puback(ClientInfo, MsgId, Session) -> + with_sess(?FUNCTION_NAME, [ClientInfo, MsgId], Session). + +pubrec(ClientInfo, MsgId, Session) -> + with_sess(?FUNCTION_NAME, [ClientInfo, MsgId], Session). + +pubrel(ClientInfo, MsgId, Session) -> + with_sess(?FUNCTION_NAME, [ClientInfo, MsgId], Session). + +pubcomp(ClientInfo, MsgId, Session) -> + with_sess(?FUNCTION_NAME, [ClientInfo, MsgId], Session). + +publish(ClientInfo, MsgId, Msg, Session) -> + with_sess(?FUNCTION_NAME, [ClientInfo, MsgId, Msg], Session). + +subscribe(ClientInfo, Topic, SubOpts, Session) -> + with_sess(?FUNCTION_NAME, [ClientInfo, Topic, SubOpts], Session). + +unsubscribe(ClientInfo, Topic, SubOpts, Session) -> + with_sess(?FUNCTION_NAME, [ClientInfo, Topic, SubOpts], Session). + +replay(ClientInfo, Session) -> + with_sess(?FUNCTION_NAME, [ClientInfo], Session). + +deliver(ClientInfo, Delivers, Session1) -> + with_sess(?FUNCTION_NAME, [ClientInfo, Delivers], Session1). + +obtain_next_pkt_id(Session = #{session := Sess}) -> + {Id, Sess1} = emqx_session:obtain_next_pkt_id(Sess), + {Id, Session#{session := Sess1}}. + +takeover(_Session = #{session := Sess}) -> + emqx_session:takeover(Sess). + +enqueue(ClientInfo, Delivers, Session = #{session := Sess}) -> + Sess1 = emqx_session:enqueue(ClientInfo, Delivers, Sess), + Session#{session := Sess1}. + +retry(ClientInfo, Session) -> + with_sess(?FUNCTION_NAME, [ClientInfo], Session). + +expire(ClientInfo, awaiting_rel, Session) -> + with_sess(?FUNCTION_NAME, [ClientInfo, awaiting_rel], Session). + +resume(ClientInfo, #{session := Sess}) -> + emqx_session:resume(ClientInfo, Sess). + +%%-------------------------------------------------------------------- +%% internal funcs + +with_sess(Fun, Args, Session = #{session := Sess}) -> + case apply(emqx_session, Fun, Args ++ [Sess]) of + %% for subscribe + {error, Reason} -> + {error, Reason}; + %% for pubrel + {ok, Sess1} -> + {ok, Session#{session := Sess1}}; + %% for publish and puback + {ok, Result, Sess1} -> + {ok, Result, Session#{session := Sess1}}; + %% for puback + {ok, Msgs, Replies, Sess1} -> + {ok, Msgs, Replies, Session#{session := Sess1}} + end. diff --git a/apps/emqx_gateway_mqttsn/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_gateway_mqttsn/test/emqx_sn_protocol_SUITE.erl index 73e8d5312..c3fa89c70 100644 --- a/apps/emqx_gateway_mqttsn/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_gateway_mqttsn/test/emqx_sn_protocol_SUITE.erl @@ -47,12 +47,15 @@ -define(LOG(Format, Args), ct:log("TEST: " ++ Format, Args)). --define(MAX_PRED_TOPIC_ID, 2). +-define(MAX_PRED_TOPIC_ID, ?SN_MAX_PREDEF_TOPIC_ID). -define(PREDEF_TOPIC_ID1, 1). -define(PREDEF_TOPIC_ID2, 2). -define(PREDEF_TOPIC_NAME1, <<"/predefined/topic/name/hello">>). -define(PREDEF_TOPIC_NAME2, <<"/predefined/topic/name/nice">>). --define(ENABLE_QOS3, true). +-define(DEFAULT_PREDEFINED_TOPICS, [ + #{<<"id">> => ?PREDEF_TOPIC_ID1, <<"topic">> => ?PREDEF_TOPIC_NAME1}, + #{<<"id">> => ?PREDEF_TOPIC_ID2, <<"topic">> => ?PREDEF_TOPIC_NAME2} +]). % FLAG NOT USED -define(FNU, 0). @@ -143,6 +146,13 @@ restart_mqttsn_with_mountpoint(Mp) -> Conf#{<<"mountpoint">> => Mp} ). +restart_mqttsn_with_predefined_topics(Topics) -> + Conf = emqx:get_raw_config([gateway, mqttsn]), + emqx_gateway_conf:update_gateway( + mqttsn, + Conf#{<<"predefined">> => Topics} + ). + default_config() -> ?CONF_DEFAULT. @@ -487,6 +497,35 @@ t_subscribe_case08(_) -> ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), gen_udp:close(Socket). +t_subscribe_predefined_topic(_) -> + Dup = 0, + QoS = 0, + Retain = 0, + Will = 0, + CleanSession = 0, + MsgId = 1, + Socket = ensure_connected_client(?CLIENTID), + send_subscribe_msg_predefined_topic(Socket, 0, ?PREDEF_TOPIC_ID1, 1), + ?assertEqual( + <<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + ?PREDEF_TOPIC_ID1:16, MsgId:16, ?SN_RC_ACCEPTED>>, + receive_response(Socket) + ), + send_disconnect_msg(Socket, undefined), + gen_udp:close(Socket), + + restart_mqttsn_with_predefined_topics([]), + Socket1 = ensure_connected_client(?CLIENTID), + send_subscribe_msg_predefined_topic(Socket1, 0, ?PREDEF_TOPIC_ID1, 1), + ?assertEqual( + <<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, 0:16, + MsgId:16, ?SN_RC_INVALID_TOPIC_ID>>, + receive_response(Socket1) + ), + send_disconnect_msg(Socket1, undefined), + restart_mqttsn_with_predefined_topics(?DEFAULT_PREDEFINED_TOPICS), + gen_udp:close(Socket1). + t_publish_negqos_enabled(_) -> Dup = 0, QoS = 0, @@ -513,14 +552,11 @@ t_publish_negqos_enabled(_) -> Payload1 = <<20, 21, 22, 23>>, send_publish_msg_normal_topic(Socket, NegQoS, MsgId1, TopicId1, Payload1), timer:sleep(100), - case ?ENABLE_QOS3 of - true -> - Eexp = - <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, - ?SN_NORMAL_TOPIC:2, TopicId1:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>, - What = receive_response(Socket), - ?assertEqual(Eexp, What) - end, + Eexp = + <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId1:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>, + What = receive_response(Socket), + ?assertEqual(Eexp, What), send_disconnect_msg(Socket, undefined), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), @@ -2777,3 +2813,9 @@ flush(Msgs) -> M -> flush([M | Msgs]) after 0 -> lists:reverse(Msgs) end. + +ensure_connected_client(ClientId) -> + {ok, Socket} = gen_udp:open(0, [binary]), + send_connect_msg(Socket, ClientId), + ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), + Socket. diff --git a/apps/emqx_gateway_mqttsn/test/emqx_sn_registry_SUITE.erl b/apps/emqx_gateway_mqttsn/test/emqx_sn_registry_SUITE.erl index 4d89a802d..8d60570a0 100644 --- a/apps/emqx_gateway_mqttsn/test/emqx_sn_registry_SUITE.erl +++ b/apps/emqx_gateway_mqttsn/test/emqx_sn_registry_SUITE.erl @@ -19,10 +19,11 @@ -compile(export_all). -compile(nowarn_export_all). +-include("emqx_mqttsn.hrl"). -include_lib("eunit/include/eunit.hrl"). -define(REGISTRY, emqx_mqttsn_registry). --define(MAX_PREDEF_ID, 2). +-define(MAX_PREDEF_ID, ?SN_MAX_PREDEF_TOPIC_ID). -define(PREDEF_TOPICS, [ #{id => 1, topic => <<"/predefined/topic/name/hello">>}, #{id => 2, topic => <<"/predefined/topic/name/nice">>} @@ -36,96 +37,64 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - application:ensure_all_started(ekka), - mria:start(), Config. end_per_suite(_Config) -> - application:stop(ekka), ok. init_per_testcase(_TestCase, Config) -> - {ok, Pid} = ?REGISTRY:start_link('mqttsn', ?PREDEF_TOPICS), - {Tab, Pid} = ?REGISTRY:lookup_name(Pid), - [{reg, {Tab, Pid}} | Config]. + emqx_mqttsn_registry:persist_predefined_topics(?PREDEF_TOPICS), + Config. end_per_testcase(_TestCase, Config) -> - {Tab, _Pid} = proplists:get_value(reg, Config), - mria:clear_table(Tab), + emqx_mqttsn_registry:clear_predefined_topics(?PREDEF_TOPICS), Config. %%-------------------------------------------------------------------- %% Test cases %%-------------------------------------------------------------------- -t_register(Config) -> - Reg = proplists:get_value(reg, Config), - ?assertEqual(?MAX_PREDEF_ID + 1, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"Topic1">>)), - ?assertEqual(?MAX_PREDEF_ID + 2, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"Topic2">>)), - ?assertEqual(<<"Topic1">>, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID + 1)), - ?assertEqual(<<"Topic2">>, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID + 2)), - ?assertEqual(?MAX_PREDEF_ID + 1, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic1">>)), - ?assertEqual(?MAX_PREDEF_ID + 2, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic2">>)), - emqx_mqttsn_registry:unregister_topic(Reg, <<"ClientId">>), - ?assertEqual(undefined, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID + 1)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID + 2)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic1">>)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic2">>)). +t_register(_) -> + Reg = ?REGISTRY:init(), + {ok, ?MAX_PREDEF_ID + 1, Reg1} = ?REGISTRY:reg(<<"Topic1">>, Reg), + {ok, ?MAX_PREDEF_ID + 2, Reg2} = ?REGISTRY:reg(<<"Topic2">>, Reg1), + ?assertMatch({ok, ?MAX_PREDEF_ID + 1, Reg2}, ?REGISTRY:reg(<<"Topic1">>, Reg2)), + ?assertEqual(<<"Topic1">>, ?REGISTRY:lookup_topic(?MAX_PREDEF_ID + 1, Reg2)), + ?assertEqual(<<"Topic2">>, ?REGISTRY:lookup_topic(?MAX_PREDEF_ID + 2, Reg2)), + ?assertEqual(?MAX_PREDEF_ID + 1, ?REGISTRY:lookup_topic_id(<<"Topic1">>, Reg2)), + ?assertEqual(?MAX_PREDEF_ID + 2, ?REGISTRY:lookup_topic_id(<<"Topic2">>, Reg2)), -t_register_case2(Config) -> - Reg = proplists:get_value(reg, Config), - ?assertEqual(?MAX_PREDEF_ID + 1, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"Topic1">>)), - ?assertEqual(?MAX_PREDEF_ID + 2, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"Topic2">>)), - ?assertEqual(?MAX_PREDEF_ID + 1, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"Topic1">>)), - ?assertEqual(<<"Topic1">>, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID + 1)), - ?assertEqual(<<"Topic2">>, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID + 2)), - ?assertEqual(?MAX_PREDEF_ID + 1, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic1">>)), - ?assertEqual(?MAX_PREDEF_ID + 2, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic2">>)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic3">>)), - ?REGISTRY:unregister_topic(Reg, <<"ClientId">>), - ?assertEqual(undefined, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID + 1)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID + 2)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic1">>)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic2">>)). + Reg3 = emqx_mqttsn_registry:unreg(<<"Topic1">>, Reg2), + ?assertEqual(undefined, ?REGISTRY:lookup_topic(?MAX_PREDEF_ID + 1, Reg3)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"Topic1">>, Reg3)), + ?assertEqual(<<"Topic2">>, ?REGISTRY:lookup_topic(?MAX_PREDEF_ID + 2, Reg3)), + ?assertEqual(?MAX_PREDEF_ID + 2, ?REGISTRY:lookup_topic_id(<<"Topic2">>, Reg3)), -t_reach_maximum(Config) -> - Reg = proplists:get_value(reg, Config), - register_a_lot(?MAX_PREDEF_ID + 1, 16#ffff, Reg), - ?assertEqual({error, too_large}, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"TopicABC">>)), - Topic1 = iolist_to_binary(io_lib:format("Topic~p", [?MAX_PREDEF_ID + 1])), - Topic2 = iolist_to_binary(io_lib:format("Topic~p", [?MAX_PREDEF_ID + 2])), - ?assertEqual(?MAX_PREDEF_ID + 1, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, Topic1)), - ?assertEqual(?MAX_PREDEF_ID + 2, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, Topic2)), - ?REGISTRY:unregister_topic(Reg, <<"ClientId">>), - ?assertEqual(undefined, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID + 1)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID + 2)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, Topic1)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, Topic2)). + ?assertMatch({ok, ?MAX_PREDEF_ID + 3, _Reg4}, ?REGISTRY:reg(<<"Topic3">>, Reg3)). -t_register_case4(Config) -> - Reg = proplists:get_value(reg, Config), - ?assertEqual(?MAX_PREDEF_ID + 1, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"TopicA">>)), - ?assertEqual(?MAX_PREDEF_ID + 2, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"TopicB">>)), - ?assertEqual(?MAX_PREDEF_ID + 3, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"TopicC">>)), - ?REGISTRY:unregister_topic(Reg, <<"ClientId">>), - ?assertEqual(?MAX_PREDEF_ID + 1, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"TopicD">>)). +t_reach_maximum(_) -> + Reg0 = ?REGISTRY:init(), + Reg = register_a_lot(?MAX_PREDEF_ID + 1, 16#ffff, Reg0), + ?assertEqual({error, too_large}, ?REGISTRY:reg(<<"TopicABC">>, Reg)), + ?assertEqual(?MAX_PREDEF_ID + 1, ?REGISTRY:lookup_topic_id(<<"Topic1025">>, Reg)), + ?assertEqual(?MAX_PREDEF_ID + 2, ?REGISTRY:lookup_topic_id(<<"Topic1026">>, Reg)). -t_deny_wildcard_topic(Config) -> - Reg = proplists:get_value(reg, Config), - ?assertEqual( - {error, wildcard_topic}, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"/TopicA/#">>) - ), - ?assertEqual( - {error, wildcard_topic}, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"/+/TopicB">>) - ). +t_deny_wildcard_topic(_) -> + Reg = ?REGISTRY:init(), + ?assertEqual({error, wildcard_topic}, ?REGISTRY:reg(<<"/TopicA/#">>, Reg)), + ?assertEqual({error, wildcard_topic}, ?REGISTRY:reg(<<"/+/TopicB">>, Reg)). %%-------------------------------------------------------------------- %% Helper funcs %%-------------------------------------------------------------------- -register_a_lot(Max, Max, _Reg) -> - ok; -register_a_lot(N, Max, Reg) when N < Max -> +register_a_lot(N, Max, Reg) when N =< Max -> Topic = iolist_to_binary(["Topic", integer_to_list(N)]), - ?assertEqual(N, ?REGISTRY:register_topic(Reg, <<"ClientId">>, Topic)), - register_a_lot(N + 1, Max, Reg). + {ok, ReturnedId, Reg1} = ?REGISTRY:reg(Topic, Reg), + ?assertEqual(N, ReturnedId), + case N == Max of + true -> + Reg1; + _ -> + register_a_lot(N + 1, Max, Reg1) + end. diff --git a/changes/ce/fix-10955.en.md b/changes/ce/fix-10955.en.md new file mode 100644 index 000000000..a08b80560 --- /dev/null +++ b/changes/ce/fix-10955.en.md @@ -0,0 +1 @@ +Fix the issue in MQTT-SN gateway where deleting Predefined Topics configuration does not work.