From 3c6afee690287307b9d0b1ea788f70e8eec6ee0b Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 9 Jun 2023 15:27:53 +0800 Subject: [PATCH] refactor(mqttsn): make the topic registration mechanism simpler --- .../src/emqx_gateway_mqttsn.erl | 13 +- .../src/emqx_mqttsn_channel.erl | 98 ++--- .../src/emqx_mqttsn_registry.erl | 344 ++++++------------ .../test/emqx_sn_registry_SUITE.erl | 106 ++---- 4 files changed, 182 insertions(+), 379 deletions(-) diff --git a/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.erl b/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.erl index 1e32c5b85..23f32497b 100644 --- a/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.erl +++ b/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.erl @@ -56,8 +56,7 @@ on_gateway_load( }, Ctx ) -> - %% We Also need to start `emqx_mqttsn_broadcast` & - %% `emqx_mqttsn_registry` process + %% We Also need to start `emqx_mqttsn_broadcast` case maps:get(broadcast, Config, false) of false -> ok; @@ -70,12 +69,9 @@ on_gateway_load( end, PredefTopics = maps:get(predefined, Config, []), - {ok, RegistrySvr} = emqx_mqttsn_registry:start_link(GwName, PredefTopics), + ok = emqx_mqttsn_registry:persist_predefined_topics(PredefTopics), - NConfig = maps:without( - [broadcast, predefined], - Config#{registry => emqx_mqttsn_registry:lookup_name(RegistrySvr)} - ), + NConfig = maps:without([broadcast, predefined], Config), Listeners = emqx_gateway_utils:normalize_config(NConfig), @@ -125,6 +121,7 @@ on_gateway_unload( }, _GwState ) -> - emqx_mqttsn_registry:clean_predefined_topics(GwName, maps:get(predefined, Config, [])), + PredefTopics = maps:get(predefined, Config, []), + ok = emqx_mqttsn_registry:clear_predefined_topics(PredefTopics), Listeners = normalize_config(Config), stop_listeners(GwName, Listeners). diff --git a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl index b03d878b7..fb8aa76e4 100644 --- a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl +++ b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl @@ -479,11 +479,7 @@ handle_in( ?SN_SHORT_TOPIC -> TopicId; ?SN_PREDEFINED_TOPIC -> - emqx_mqttsn_registry:lookup_topic( - Registry, - ?NEG_QOS_CLIENT_ID, - TopicId - ); + emqx_mqttsn_registry:lookup_topic(TopicId, Registry); _ -> undefined end, @@ -631,20 +627,17 @@ handle_in( end; handle_in( ?SN_REGISTER_MSG(_TopicId, MsgId, TopicName), - Channel = #channel{ - registry = Registry, - clientinfo = #{clientid := ClientId} - } + Channel = #channel{registry = Registry} ) -> - case emqx_mqttsn_registry:register_topic(Registry, ClientId, TopicName) of - TopicId when is_integer(TopicId) -> + case emqx_mqttsn_registry:reg(TopicName, Registry) of + {ok, TopicId, NRegistry} -> ?SLOG(debug, #{ msg => "registered_topic_name", topic_name => TopicName, topic_id => TopicId }), AckPacket = ?SN_REGACK_MSG(TopicId, MsgId, ?SN_RC_ACCEPTED), - {ok, {outgoing, AckPacket}, Channel}; + {ok, {outgoing, AckPacket}, Channel#channel{registry = NRegistry}}; {error, too_large} -> ?SLOG(error, #{ msg => "register_topic_failed", @@ -758,7 +751,7 @@ handle_in( ctx = Ctx, registry = Registry, session = Session, - clientinfo = ClientInfo = #{clientid := ClientId} + clientinfo = ClientInfo } ) -> case ReturnCode of @@ -795,7 +788,7 @@ handle_in( {ok, Channel} end; ?SN_RC_INVALID_TOPIC_ID -> - case emqx_mqttsn_registry:lookup_topic(Registry, ClientId, TopicId) of + case emqx_mqttsn_registry:lookup_topic(TopicId, Registry) of undefined -> {ok, Channel}; TopicName -> @@ -1100,12 +1093,9 @@ convert_topic_id_to_name({{name, TopicName}, Flags, Data}, Channel) -> {ok, {TopicName, Flags, Data}, Channel}; convert_topic_id_to_name( {{id, TopicId}, Flags, Data}, - Channel = #channel{ - registry = Registry, - clientinfo = #{clientid := ClientId} - } + Channel = #channel{registry = Registry} ) -> - case emqx_mqttsn_registry:lookup_topic(Registry, ClientId, TopicId) of + case emqx_mqttsn_registry:lookup_topic(TopicId, Registry) of undefined -> {error, ?SN_RC_INVALID_TOPIC_ID}; TopicName -> @@ -1207,15 +1197,12 @@ preproc_subs_type( TopicName, QoS ), - Channel = #channel{ - registry = Registry, - clientinfo = #{clientid := ClientId} - } + Channel = #channel{registry = Registry} ) -> %% If the gateway is able accept the subscription, %% it assigns a topic id to the received topic name %% and returns it within a SUBACK message - case emqx_mqttsn_registry:register_topic(Registry, ClientId, TopicName) of + case emqx_mqttsn_registry:reg(TopicName, Registry) of {error, too_large} -> {error, ?SN_RC2_EXCEED_LIMITATION}; {error, wildcard_topic} -> @@ -1226,8 +1213,8 @@ preproc_subs_type( %% value when it has the first PUBLISH message with a matching %% topic name to be sent to the client, see also Section 6.10. {ok, {?SN_INVALID_TOPIC_ID, TopicName, QoS}, Channel}; - TopicId when is_integer(TopicId) -> - {ok, {TopicId, TopicName, QoS}, Channel} + {ok, TopicId, NRegistry} -> + {ok, {TopicId, TopicName, QoS}, Channel#channel{registry = NRegistry}} end; preproc_subs_type( ?SN_SUBSCRIBE_MSG_TYPE( @@ -1235,18 +1222,9 @@ preproc_subs_type( TopicId, QoS ), - Channel = #channel{ - registry = Registry, - clientinfo = #{clientid := ClientId} - } + Channel = #channel{registry = Registry} ) -> - case - emqx_mqttsn_registry:lookup_topic( - Registry, - ClientId, - TopicId - ) - of + case emqx_mqttsn_registry:lookup_topic(TopicId, Registry) of undefined -> {error, ?SN_RC_INVALID_TOPIC_ID}; TopicName -> @@ -1351,18 +1329,9 @@ preproc_unsub_type( ?SN_PREDEFINED_TOPIC, TopicId ), - Channel = #channel{ - registry = Registry, - clientinfo = #{clientid := ClientId} - } + Channel = #channel{registry = Registry} ) -> - case - emqx_mqttsn_registry:lookup_topic( - Registry, - ClientId, - TopicId - ) - of + case emqx_mqttsn_registry:lookup_topic(TopicId, Registry) of undefined -> {error, not_found}; TopicName -> @@ -1765,10 +1734,7 @@ outgoing_deliver_and_register({Packets, Channel}) -> message_to_packet( MsgId, Message, - #channel{ - registry = Registry, - clientinfo = #{clientid := ClientId} - } + #channel{registry = Registry} ) -> QoS = emqx_message:qos(Message), Topic = emqx_message:topic(Message), @@ -1778,7 +1744,7 @@ message_to_packet( ?QOS_0 -> 0; _ -> MsgId end, - case emqx_mqttsn_registry:lookup_topic_id(Registry, ClientId, Topic) of + case emqx_mqttsn_registry:lookup_topic_id(Topic, Registry) of {predef, PredefTopicId} -> Flags = #mqtt_sn_flags{qos = QoS, topic_id_type = ?SN_PREDEFINED_TOPIC}, ?SN_PUBLISH_MSG(Flags, PredefTopicId, NMsgId, Payload); @@ -1911,8 +1877,8 @@ handle_info(clean_authz_cache, Channel) -> {ok, Channel}; handle_info({subscribe, _}, Channel) -> {ok, Channel}; -handle_info({register, TopicName}, Channel) -> - case ensure_registered_topic_name(TopicName, Channel) of +handle_info({register, TopicName}, Channel = #channel{registry = Registry}) -> + case emqx_mqttsn_registry:reg(TopicName, Registry) of {error, Reason} -> ?SLOG(error, #{ msg => "register_topic_failed", @@ -1920,8 +1886,8 @@ handle_info({register, TopicName}, Channel) -> reason => Reason }), {ok, Channel}; - {ok, TopicId} -> - handle_out(register, {TopicId, TopicName}, Channel) + {ok, TopicId, NRegistry} -> + handle_out(register, {TopicId, TopicName}, Channel#channel{registry = NRegistry}) end; handle_info(Info, Channel) -> ?SLOG(error, #{ @@ -1940,21 +1906,6 @@ maybe_shutdown(Reason, Channel = #channel{conninfo = ConnInfo}) -> shutdown(Reason, Channel) end. -ensure_registered_topic_name( - TopicName, - Channel = #channel{registry = Registry} -) -> - ClientId = clientid(Channel), - case emqx_mqttsn_registry:lookup_topic_id(Registry, ClientId, TopicName) of - undefined -> - case emqx_mqttsn_registry:register_topic(Registry, ClientId, TopicName) of - {error, Reason} -> {error, Reason}; - TopicId -> {ok, TopicId} - end; - TopicId -> - {ok, TopicId} - end. - %%-------------------------------------------------------------------- %% Ensure disconnected @@ -2309,9 +2260,6 @@ interval(await_timer, #channel{session = Session}) -> %% Helper functions %%-------------------------------------------------------------------- -clientid(#channel{clientinfo = #{clientid := ClientId}}) -> - ClientId. - run_hooks(Ctx, Name, Args) -> emqx_gateway_ctx:metrics_inc(Ctx, Name), emqx_hooks:run(Name, Args). diff --git a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_registry.erl b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_registry.erl index e0e138531..59ce39d4b 100644 --- a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_registry.erl +++ b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_registry.erl @@ -17,151 +17,42 @@ %% @doc The MQTT-SN Topic Registry -module(emqx_mqttsn_registry). --behaviour(gen_server). - -include("emqx_mqttsn.hrl"). --include_lib("emqx/include/logger.hrl"). - --export([start_link/2]). -export([ - register_topic/3, - unregister_topic/2, - clean_predefined_topics/2 + persist_predefined_topics/1, + clear_predefined_topics/1 ]). -export([ - lookup_topic/3, - lookup_topic_id/3 + init/0, + reg/2, + unreg/2, + lookup_topic/2, + lookup_topic_id/2 ]). -%% gen_server callbacks --export([ - init/1, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2, - code_change/3 -]). +-define(PKEY(Id), {mqttsn, predef_topics, Id}). +-define(PKEY_MAX_PREDEF_ID, {mqttsn, max_predef_topic_id}). -%% Internal exports (RPC) --export([ - do_register/4 -]). - --export([lookup_name/1]). - --define(SN_SHARD, emqx_mqttsn_shard). - --record(state, {tabname, max_predef_topic_id = 0}). - --record(emqx_mqttsn_registry, {key, value}). - --type registry() :: {Tab :: atom(), RegistryPid :: pid()}. +-type registry() :: #{ + %% The next topic id to be assigned to new registration + next_topic_id := pos_integer(), + %% The mapping from topic id to topic name + id_to_name := map(), + %% The mapping from topic name to topic id + name_to_id := map() +}. %%----------------------------------------------------------------------------- --spec start_link(atom(), list()) -> - ignore - | {ok, pid()} - | {error, Reason :: term()}. -start_link(InstaId, PredefTopics) -> - gen_server:start_link(?MODULE, [InstaId, PredefTopics], []). - --spec register_topic(registry(), emqx_types:clientid(), emqx_types:topic()) -> - integer() - | {error, term()}. -register_topic({_, Pid}, ClientId, TopicName) when is_binary(TopicName) -> - case emqx_topic:wildcard(TopicName) of - false -> - gen_server:call(Pid, {register, ClientId, TopicName}); - %% TopicId: in case of “accepted” the value that will be used as topic - %% id by the gateway when sending PUBLISH messages to the client (not - %% relevant in case of subscriptions to a short topic name or to a topic - %% name which contains wildcard characters) - true -> - {error, wildcard_topic} - end. - --spec lookup_topic(registry(), emqx_types:clientid(), pos_integer()) -> - undefined - | binary(). -lookup_topic({Tab, _}, ClientId, TopicId) when is_integer(TopicId) -> - case lookup_element(Tab, {predef, TopicId}, 3) of - undefined -> - lookup_element(Tab, {ClientId, TopicId}, 3); - Topic -> - Topic - end. - --spec lookup_topic_id(registry(), emqx_types:clientid(), emqx_types:topic()) -> - undefined - | pos_integer() - | {predef, integer()}. -lookup_topic_id({Tab, _}, ClientId, TopicName) when is_binary(TopicName) -> - case lookup_element(Tab, {predef, TopicName}, 3) of - undefined -> - lookup_element(Tab, {ClientId, TopicName}, 3); - TopicId -> - {predef, TopicId} - end. - -%% @private -lookup_element(Tab, Key, Pos) -> - try - ets:lookup_element(Tab, Key, Pos) - catch - error:badarg -> undefined - end. - --spec unregister_topic(registry(), emqx_types:clientid()) -> ok. -unregister_topic({_, Pid}, ClientId) -> - gen_server:call(Pid, {unregister, ClientId}). - --spec clean_predefined_topics(atom(), list()) -> ok. -clean_predefined_topics(InstaId, PredefTopics) when is_list(PredefTopics) -> - Tab = name(InstaId), - F = fun(#{id := TopicId, topic := TopicName0}) -> - TopicName = iolist_to_binary(TopicName0), - mria:dirty_delete(Tab, {predef, TopicId}), - mria:dirty_delete(Tab, {predef, TopicName}) - end, - lists:foreach(F, PredefTopics). - -lookup_name(Pid) -> - gen_server:call(Pid, name). - -%%----------------------------------------------------------------------------- - -name(InstaId) -> - list_to_atom(lists:concat([emqx_mqttsn_, InstaId, '_registry'])). - -init([InstaId, PredefTopics]) -> - %% {predef, TopicId} -> TopicName - %% {predef, TopicName} -> TopicId - %% {ClientId, TopicId} -> TopicName - %% {ClientId, TopicName} -> TopicId - Tab = name(InstaId), - ok = mria:create_table(Tab, [ - {storage, ram_copies}, - {record_name, emqx_mqttsn_registry}, - {attributes, record_info(fields, emqx_mqttsn_registry)}, - {storage_properties, [{ets, [{read_concurrency, true}]}]}, - {rlog_shard, ?SN_SHARD} - ]), - ok = mria:wait_for_tables([Tab]), +-spec persist_predefined_topics(list()) -> ok. +persist_predefined_topics(PredefTopics) when is_list(PredefTopics) -> MaxPredefId = lists:foldl( fun(#{id := TopicId, topic := TopicName0}, AccId) -> TopicName = iolist_to_binary(TopicName0), - mria:dirty_write(Tab, #emqx_mqttsn_registry{ - key = {predef, TopicId}, - value = TopicName - }), - mria:dirty_write(Tab, #emqx_mqttsn_registry{ - key = {predef, TopicName}, - value = TopicId - }), + persistent_term:put(?PKEY(TopicId), TopicName), + persistent_term:put(?PKEY(TopicName), TopicId), case TopicId > AccId of true -> TopicId; false -> AccId @@ -170,106 +61,105 @@ init([InstaId, PredefTopics]) -> 0, PredefTopics ), - {ok, #state{tabname = Tab, max_predef_topic_id = MaxPredefId}}. - -handle_call( - {register, ClientId, TopicName}, - _From, - State = #state{tabname = Tab, max_predef_topic_id = PredefId} -) -> - case lookup_topic_id({Tab, self()}, ClientId, TopicName) of - {predef, PredefTopicId} when is_integer(PredefTopicId) -> - {reply, PredefTopicId, State}; - TopicId when is_integer(TopicId) -> - {reply, TopicId, State}; - undefined -> - case next_topic_id(Tab, PredefId, ClientId) of - TopicId when TopicId >= 16#FFFF -> - {reply, {error, too_large}, State}; - TopicId -> - case - mria:transaction(?SN_SHARD, fun ?MODULE:do_register/4, [ - Tab, ClientId, TopicId, TopicName - ]) - of - {atomic, ok} -> - {reply, TopicId, State}; - {aborted, Error} -> - {reply, {error, Error}, State} - end - end - end; -handle_call({unregister, ClientId}, _From, State = #state{tabname = Tab}) -> - Registry = mnesia:dirty_match_object( - Tab, - {emqx_mqttsn_registry, {ClientId, '_'}, '_'} - ), - lists:foreach( - fun(R) -> - mria:dirty_delete_object(Tab, R) - end, - Registry - ), - {reply, ok, State}; -handle_call(name, _From, State = #state{tabname = Tab}) -> - {reply, {Tab, self()}, State}; -handle_call(Req, _From, State) -> - ?SLOG(error, #{ - msg => "unexpected_call", - call => Req - }), - {reply, ignored, State}. - -handle_cast(Msg, State) -> - ?SLOG(error, #{ - msg => "unexpected_cast", - cast => Msg - }), - {noreply, State}. - -handle_info(Info, State) -> - ?SLOG(error, #{ - msg => "unexpected_info", - info => Info - }), - {noreply, State}. - -terminate(_Reason, _State) -> + persistent_term:put(?PKEY_MAX_PREDEF_ID, MaxPredefId), ok. -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -do_register(Tab, ClientId, TopicId, TopicName) -> - mnesia:write( - Tab, - #emqx_mqttsn_registry{ - key = {ClientId, next_topic_id}, - value = TopicId + 1 - }, - write +-spec clear_predefined_topics(list()) -> ok. +clear_predefined_topics(PredefTopics) -> + lists:foreach( + fun(#{id := TopicId, topic := TopicName0}) -> + TopicName = iolist_to_binary(TopicName0), + persistent_term:erase(?PKEY(TopicId)), + persistent_term:erase(?PKEY(TopicName)) + end, + PredefTopics ), - mnesia:write( - Tab, - #emqx_mqttsn_registry{ - key = {ClientId, TopicName}, - value = TopicId - }, - write - ), - mnesia:write( - Tab, - #emqx_mqttsn_registry{ - key = {ClientId, TopicId}, - value = TopicName - }, - write - ). + persistent_term:erase(?PKEY_MAX_PREDEF_ID), + ok. -%%----------------------------------------------------------------------------- +-spec init() -> registry(). +init() -> + #{ + next_topic_id => persistent_term:get(?PKEY_MAX_PREDEF_ID, 0), + id_to_name => #{}, + name_to_id => #{} + }. -next_topic_id(Tab, PredefId, ClientId) -> - case mnesia:dirty_read(Tab, {ClientId, next_topic_id}) of - [#emqx_mqttsn_registry{value = Id}] -> Id; - [] -> PredefId + 1 +-spec reg(emqx_types:topic(), registry()) -> + {ok, integer(), registry()} + | {error, term()}. +reg( + TopicName, + Registry = #{ + next_topic_id := TopicId0, + id_to_name := IdMap, + name_to_id := NameMap + } +) when is_binary(TopicName) -> + case emqx_topic:wildcard(TopicName) of + false -> + case maps:find(TopicName, NameMap) of + {ok, TopicId} -> + {ok, TopicId, Registry}; + error -> + case next_topic_id(TopicId0) of + {error, too_large} -> + {error, too_large}; + NextTopicId -> + NRegistry = Registry#{ + next_topic_id := NextTopicId, + id_to_name := maps:put(NextTopicId, TopicName, IdMap), + name_to_id := maps:put(TopicName, NextTopicId, NameMap) + }, + {ok, NextTopicId, NRegistry} + end + end; + %% TopicId: in case of “accepted” the value that will be used as topic + %% id by the gateway when sending PUBLISH messages to the client (not + %% relevant in case of subscriptions to a short topic name or to a topic + %% name which contains wildcard characters) + true -> + {error, wildcard_topic} + end. + +next_topic_id(Id) when is_integer(Id) andalso (Id < 16#FFFF) -> + Id + 1; +next_topic_id(Id) when is_integer(Id) -> + {error, too_large}. + +-spec lookup_topic(pos_integer(), registry()) -> + undefined + | binary(). +lookup_topic(TopicId, _Registry = #{id_to_name := IdMap}) when is_integer(TopicId) -> + case persistent_term:get(?PKEY(TopicId), undefined) of + undefined -> + maps:get(TopicId, IdMap, undefined); + Topic -> + Topic + end. + +-spec lookup_topic_id(emqx_types:topic(), registry()) -> + undefined + | pos_integer() + | {predef, integer()}. +lookup_topic_id(TopicName, _Registry = #{name_to_id := NameMap}) when is_binary(TopicName) -> + case persistent_term:get(?PKEY(TopicName), undefined) of + undefined -> + maps:get(TopicName, NameMap, undefined); + TopicId -> + {predef, TopicId} + end. + +-spec unreg(emqx_types:topic(), registry()) -> registry(). +unreg(TopicName, Registry = #{name_to_id := NameMap, id_to_name := IdMap}) when + is_binary(TopicName) +-> + case maps:find(TopicName, NameMap) of + {ok, TopicId} -> + Registry#{ + name_to_id := maps:remove(TopicName, NameMap), + id_to_name := maps:remove(TopicId, IdMap) + }; + error -> + Registry end. diff --git a/apps/emqx_gateway_mqttsn/test/emqx_sn_registry_SUITE.erl b/apps/emqx_gateway_mqttsn/test/emqx_sn_registry_SUITE.erl index 4d89a802d..6c821ebf4 100644 --- a/apps/emqx_gateway_mqttsn/test/emqx_sn_registry_SUITE.erl +++ b/apps/emqx_gateway_mqttsn/test/emqx_sn_registry_SUITE.erl @@ -36,96 +36,64 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - application:ensure_all_started(ekka), - mria:start(), Config. end_per_suite(_Config) -> - application:stop(ekka), ok. init_per_testcase(_TestCase, Config) -> - {ok, Pid} = ?REGISTRY:start_link('mqttsn', ?PREDEF_TOPICS), - {Tab, Pid} = ?REGISTRY:lookup_name(Pid), - [{reg, {Tab, Pid}} | Config]. + emqx_mqttsn_registry:persist_predefined_topics(?PREDEF_TOPICS), + Config. end_per_testcase(_TestCase, Config) -> - {Tab, _Pid} = proplists:get_value(reg, Config), - mria:clear_table(Tab), + emqx_mqttsn_registry:clear_predefined_topics(?PREDEF_TOPICS), Config. %%-------------------------------------------------------------------- %% Test cases %%-------------------------------------------------------------------- -t_register(Config) -> - Reg = proplists:get_value(reg, Config), - ?assertEqual(?MAX_PREDEF_ID + 1, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"Topic1">>)), - ?assertEqual(?MAX_PREDEF_ID + 2, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"Topic2">>)), - ?assertEqual(<<"Topic1">>, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID + 1)), - ?assertEqual(<<"Topic2">>, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID + 2)), - ?assertEqual(?MAX_PREDEF_ID + 1, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic1">>)), - ?assertEqual(?MAX_PREDEF_ID + 2, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic2">>)), - emqx_mqttsn_registry:unregister_topic(Reg, <<"ClientId">>), - ?assertEqual(undefined, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID + 1)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID + 2)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic1">>)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic2">>)). +t_register(_) -> + Reg = ?REGISTRY:init(), + {ok, ?MAX_PREDEF_ID + 1, Reg1} = ?REGISTRY:reg(<<"Topic1">>, Reg), + {ok, ?MAX_PREDEF_ID + 2, Reg2} = ?REGISTRY:reg(<<"Topic2">>, Reg1), + ?assertMatch({ok, ?MAX_PREDEF_ID + 1, Reg2}, ?REGISTRY:reg(<<"Topic1">>, Reg2)), + ?assertEqual(<<"Topic1">>, ?REGISTRY:lookup_topic(?MAX_PREDEF_ID + 1, Reg2)), + ?assertEqual(<<"Topic2">>, ?REGISTRY:lookup_topic(?MAX_PREDEF_ID + 2, Reg2)), + ?assertEqual(?MAX_PREDEF_ID + 1, ?REGISTRY:lookup_topic_id(<<"Topic1">>, Reg2)), + ?assertEqual(?MAX_PREDEF_ID + 2, ?REGISTRY:lookup_topic_id(<<"Topic2">>, Reg2)), -t_register_case2(Config) -> - Reg = proplists:get_value(reg, Config), - ?assertEqual(?MAX_PREDEF_ID + 1, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"Topic1">>)), - ?assertEqual(?MAX_PREDEF_ID + 2, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"Topic2">>)), - ?assertEqual(?MAX_PREDEF_ID + 1, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"Topic1">>)), - ?assertEqual(<<"Topic1">>, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID + 1)), - ?assertEqual(<<"Topic2">>, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID + 2)), - ?assertEqual(?MAX_PREDEF_ID + 1, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic1">>)), - ?assertEqual(?MAX_PREDEF_ID + 2, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic2">>)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic3">>)), - ?REGISTRY:unregister_topic(Reg, <<"ClientId">>), - ?assertEqual(undefined, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID + 1)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID + 2)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic1">>)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic2">>)). + Reg3 = emqx_mqttsn_registry:unreg(<<"Topic1">>, Reg2), + ?assertEqual(undefined, ?REGISTRY:lookup_topic(?MAX_PREDEF_ID + 1, Reg3)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"Topic1">>, Reg3)), + ?assertEqual(<<"Topic2">>, ?REGISTRY:lookup_topic(?MAX_PREDEF_ID + 2, Reg3)), + ?assertEqual(?MAX_PREDEF_ID + 2, ?REGISTRY:lookup_topic_id(<<"Topic2">>, Reg3)), -t_reach_maximum(Config) -> - Reg = proplists:get_value(reg, Config), - register_a_lot(?MAX_PREDEF_ID + 1, 16#ffff, Reg), - ?assertEqual({error, too_large}, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"TopicABC">>)), - Topic1 = iolist_to_binary(io_lib:format("Topic~p", [?MAX_PREDEF_ID + 1])), - Topic2 = iolist_to_binary(io_lib:format("Topic~p", [?MAX_PREDEF_ID + 2])), - ?assertEqual(?MAX_PREDEF_ID + 1, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, Topic1)), - ?assertEqual(?MAX_PREDEF_ID + 2, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, Topic2)), - ?REGISTRY:unregister_topic(Reg, <<"ClientId">>), - ?assertEqual(undefined, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID + 1)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID + 2)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, Topic1)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, Topic2)). + ?assertMatch({ok, ?MAX_PREDEF_ID + 3, _Reg4}, ?REGISTRY:reg(<<"Topic3">>, Reg3)). -t_register_case4(Config) -> - Reg = proplists:get_value(reg, Config), - ?assertEqual(?MAX_PREDEF_ID + 1, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"TopicA">>)), - ?assertEqual(?MAX_PREDEF_ID + 2, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"TopicB">>)), - ?assertEqual(?MAX_PREDEF_ID + 3, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"TopicC">>)), - ?REGISTRY:unregister_topic(Reg, <<"ClientId">>), - ?assertEqual(?MAX_PREDEF_ID + 1, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"TopicD">>)). +t_reach_maximum(_) -> + Reg0 = ?REGISTRY:init(), + Reg = register_a_lot(?MAX_PREDEF_ID + 1, 16#ffff, Reg0), + ?assertEqual({error, too_large}, ?REGISTRY:reg(<<"TopicABC">>, Reg)), + ?assertEqual(?MAX_PREDEF_ID + 1, ?REGISTRY:lookup_topic_id(<<"Topic3">>, Reg)), + ?assertEqual(?MAX_PREDEF_ID + 2, ?REGISTRY:lookup_topic_id(<<"Topic4">>, Reg)). -t_deny_wildcard_topic(Config) -> - Reg = proplists:get_value(reg, Config), - ?assertEqual( - {error, wildcard_topic}, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"/TopicA/#">>) - ), - ?assertEqual( - {error, wildcard_topic}, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"/+/TopicB">>) - ). +t_deny_wildcard_topic(_) -> + Reg = ?REGISTRY:init(), + ?assertEqual({error, wildcard_topic}, ?REGISTRY:reg(<<"/TopicA/#">>, Reg)), + ?assertEqual({error, wildcard_topic}, ?REGISTRY:reg(<<"/+/TopicB">>, Reg)). %%-------------------------------------------------------------------- %% Helper funcs %%-------------------------------------------------------------------- -register_a_lot(Max, Max, _Reg) -> - ok; -register_a_lot(N, Max, Reg) when N < Max -> +register_a_lot(N, Max, Reg) when N =< Max -> Topic = iolist_to_binary(["Topic", integer_to_list(N)]), - ?assertEqual(N, ?REGISTRY:register_topic(Reg, <<"ClientId">>, Topic)), - register_a_lot(N + 1, Max, Reg). + {ok, ReturnedId, Reg1} = ?REGISTRY:reg(Topic, Reg), + ?assertEqual(N, ReturnedId), + case N == Max of + true -> + Reg1; + _ -> + register_a_lot(N + 1, Max, Reg1) + end.