diff --git a/apps/emqx_sn/src/emqx_sn_app.erl b/apps/emqx_sn/src/emqx_sn_app.erl index d39e8b34f..9575523f8 100644 --- a/apps/emqx_sn/src/emqx_sn_app.erl +++ b/apps/emqx_sn/src/emqx_sn_app.erl @@ -43,7 +43,8 @@ start(_Type, _Args) -> Addr = application:get_env(emqx_sn, port, 1884), GwId = application:get_env(emqx_sn, gateway_id, 1), - {ok, Sup} = emqx_sn_sup:start_link(Addr, GwId), + PredefTopics = application:get_env(emqx_sn, predefined, []), + {ok, Sup} = emqx_sn_sup:start_link(Addr, GwId, PredefTopics), start_listeners(), {ok, Sup}. @@ -57,13 +58,7 @@ stop(_State) -> -spec start_listeners() -> ok. start_listeners() -> - PredefTopics = application:get_env(emqx_sn, predefined, []), - ListenCfs = [begin - TabName = tabname(Proto, ListenOn), - {ok, RegistryPid} = emqx_sn_sup:start_registry_proc(emqx_sn_sup, TabName, PredefTopics), - {Proto, ListenOn, [{registry, {TabName, RegistryPid}} | Options]} - end || {Proto, ListenOn, Options} <- listeners_confs()], - lists:foreach(fun start_listener/1, ListenCfs). + lists:foreach(fun start_listener/1, listeners_confs()). -spec start_listener(listener()) -> ok. start_listener({Proto, ListenOn, Options}) -> @@ -151,7 +146,3 @@ format({Addr, Port}) when is_list(Addr) -> io_lib:format("~s:~w", [Addr, Port]); format({Addr, Port}) when is_tuple(Addr) -> io_lib:format("~s:~w", [inet:ntoa(Addr), Port]). - -tabname(Proto, ListenOn) -> - list_to_atom(lists:flatten(["emqx_sn_registry__", atom_to_list(Proto), "_", format(ListenOn)])). - diff --git a/apps/emqx_sn/src/emqx_sn_gateway.erl b/apps/emqx_sn/src/emqx_sn_gateway.erl index 66022100e..2339961cf 100644 --- a/apps/emqx_sn/src/emqx_sn_gateway.erl +++ b/apps/emqx_sn/src/emqx_sn_gateway.erl @@ -82,7 +82,6 @@ sockname :: {inet:ip_address(), inet:port()}, peername :: {inet:ip_address(), inet:port()}, channel :: maybe(emqx_channel:channel()), - registry :: emqx_sn_registry:registry(), clientid :: maybe(binary()), username :: maybe(binary()), password :: maybe(binary()), @@ -147,7 +146,6 @@ kick(GwPid) -> init([{_, SockPid, Sock}, Peername, Options]) -> GwId = proplists:get_value(gateway_id, Options), - Registry = proplists:get_value(registry, Options), Username = proplists:get_value(username, Options, undefined), Password = proplists:get_value(password, Options, undefined), EnableQos3 = proplists:get_value(enable_qos3, Options, false), @@ -165,7 +163,6 @@ init([{_, SockPid, Sock}, Peername, Options]) -> sockname = Sockname, peername = Peername, channel = Channel, - registry = Registry, asleep_timer = emqx_sn_asleep_timer:init(), enable_stats = EnableStats, enable_qos3 = EnableQos3, @@ -205,9 +202,9 @@ idle(cast, {incoming, ?SN_PUBLISH_MSG(_Flag, _TopicId, _MsgId, _Data)}, State = idle(cast, {incoming, ?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1, topic_id_type = TopicIdType }, TopicId, _MsgId, Data)}, - State = #state{clientid = ClientId, registry = Registry}) -> + State = #state{clientid = ClientId}) -> TopicName = case (TopicIdType =:= ?SN_SHORT_TOPIC) of - false -> emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId); + false -> emqx_sn_registry:lookup_topic(ClientId, TopicId); true -> <> end, _ = case TopicName =/= undefined of @@ -292,9 +289,9 @@ wait_for_will_msg(EventType, EventContent, State) -> handle_event(EventType, EventContent, wait_for_will_msg, State). connected(cast, {incoming, ?SN_REGISTER_MSG(_TopicId, MsgId, TopicName)}, - State = #state{clientid = ClientId, registry = Registry}) -> + State = #state{clientid = ClientId}) -> State0 = - case emqx_sn_registry:register_topic(Registry, ClientId, TopicName) of + case emqx_sn_registry:register_topic(ClientId, TopicName) of TopicId when is_integer(TopicId) -> ?LOG(debug, "register ClientId=~p, TopicName=~p, TopicId=~p", [ClientId, TopicName, TopicId]), send_message(?SN_REGACK_MSG(TopicId, MsgId, ?SN_RC_ACCEPTED), State); @@ -580,14 +577,13 @@ handle_event(EventType, EventContent, StateName, State) -> [StateName, {EventType, EventContent}]), {keep_state, State}. -terminate(Reason, _StateName, #state{channel = Channel, - registry = Registry}) -> +terminate(Reason, _StateName, #state{channel = Channel}) -> ClientId = emqx_channel:info(clientid, Channel), case Reason of {shutdown, takeovered} -> ok; _ -> - emqx_sn_registry:unregister_topic(Registry, ClientId) + emqx_sn_registry:unregister_topic(ClientId) end, emqx_channel:terminate(Reason, Channel), ok. @@ -723,13 +719,12 @@ mqtt2sn(?PUBCOMP_PACKET(MsgId), _State) -> mqtt2sn(?UNSUBACK_PACKET(MsgId), _State)-> ?SN_UNSUBACK_MSG(MsgId); -mqtt2sn(?PUBLISH_PACKET(QoS, Topic, PacketId, Payload), #state{registry = Registry, - channel = Channel}) -> +mqtt2sn(?PUBLISH_PACKET(QoS, Topic, PacketId, Payload), #state{channel = Channel}) -> NewPacketId = if QoS =:= ?QOS_0 -> 0; true -> PacketId end, ClientId = emqx_channel:info(clientid, Channel), - {TopicIdType, TopicContent} = case emqx_sn_registry:lookup_topic_id(Registry, ClientId, Topic) of + {TopicIdType, TopicContent} = case emqx_sn_registry:lookup_topic_id(ClientId, Topic) of {predef, PredefTopicId} -> {?SN_PREDEFINED_TOPIC, PredefTopicId}; TopicId when is_integer(TopicId) -> @@ -850,14 +845,13 @@ do_connect(ClientId, CleanStart, WillFlag, Duration, State) -> do_2nd_connect(Flags, Duration, ClientId, State = #state{sockname = Sockname, peername = Peername, - registry = Registry, channel = Channel}) -> emqx_logger:set_metadata_clientid(ClientId), #mqtt_sn_flags{will = Will, clean_start = CleanStart} = Flags, NChannel = case CleanStart of true -> emqx_channel:terminate(normal, Channel), - emqx_sn_registry:unregister_topic(Registry, ClientId), + emqx_sn_registry:unregister_topic(ClientId), emqx_channel:init(#{socktype => udp, sockname => Sockname, peername => Peername, @@ -870,9 +864,9 @@ do_2nd_connect(Flags, Duration, ClientId, State = #state{sockname = Sockname, do_connect(ClientId, CleanStart, Will, Duration, NState). handle_subscribe(?SN_NORMAL_TOPIC, TopicName, QoS, MsgId, - State=#state{registry = Registry, channel = Channel}) -> + State=#state{channel = Channel}) -> ClientId = emqx_channel:info(clientid, Channel), - case emqx_sn_registry:register_topic(Registry, ClientId, TopicName) of + case emqx_sn_registry:register_topic(ClientId, TopicName) of {error, too_large} -> State0 = send_message(?SN_SUBACK_MSG(#mqtt_sn_flags{qos = QoS}, ?SN_INVALID_TOPIC_ID, @@ -886,9 +880,9 @@ handle_subscribe(?SN_NORMAL_TOPIC, TopicName, QoS, MsgId, end; handle_subscribe(?SN_PREDEFINED_TOPIC, TopicId, QoS, MsgId, - State = #state{registry = Registry, channel = Channel}) -> + State = #state{channel = Channel}) -> ClientId = emqx_channel:info(clientid, Channel), - case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of + case emqx_sn_registry:lookup_topic(ClientId, TopicId) of undefined -> State0 = send_message(?SN_SUBACK_MSG(#mqtt_sn_flags{qos = QoS}, TopicId, @@ -917,9 +911,9 @@ handle_unsubscribe(?SN_NORMAL_TOPIC, TopicId, MsgId, State) -> proto_unsubscribe(TopicId, MsgId, State); handle_unsubscribe(?SN_PREDEFINED_TOPIC, TopicId, MsgId, - State = #state{registry = Registry, channel = Channel}) -> + State = #state{channel = Channel}) -> ClientId = emqx_channel:info(clientid, Channel), - case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of + case emqx_sn_registry:lookup_topic(ClientId, TopicId) of undefined -> {keep_state, send_message(?SN_UNSUBACK_MSG(MsgId), State)}; PredefinedTopic -> @@ -941,11 +935,11 @@ do_publish(?SN_NORMAL_TOPIC, TopicName, Data, Flags, MsgId, State) -> <> = TopicName, do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId, State); do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId, - State=#state{registry = Registry, channel = Channel}) -> + State=#state{channel = Channel}) -> #mqtt_sn_flags{qos = QoS, dup = Dup, retain = Retain} = Flags, NewQoS = get_corrected_qos(QoS), ClientId = emqx_channel:info(clientid, Channel), - case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of + case emqx_sn_registry:lookup_topic(ClientId, TopicId) of undefined -> {keep_state, maybe_send_puback(NewQoS, TopicId, MsgId, ?SN_RC_INVALID_TOPIC_ID, State)}; @@ -984,13 +978,13 @@ do_publish_will(#state{will_msg = WillMsg, clientid = ClientId}) -> ok. do_puback(TopicId, MsgId, ReturnCode, StateName, - State=#state{registry = Registry, channel = Channel}) -> + State=#state{channel = Channel}) -> case ReturnCode of ?SN_RC_ACCEPTED -> handle_incoming(?PUBACK_PACKET(MsgId), StateName, State); ?SN_RC_INVALID_TOPIC_ID -> ClientId = emqx_channel:info(clientid, Channel), - case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of + case emqx_sn_registry:lookup_topic(ClientId, TopicId) of undefined -> {keep_state, State}; TopicName -> %%notice that this TopicName maybe normal or predefined, @@ -1079,10 +1073,10 @@ handle_outgoing(Packets, State) when is_list(Packets) -> end, State, Packets); handle_outgoing(PubPkt = ?PUBLISH_PACKET(_, TopicName, _, _), - State = #state{registry = Registry, channel = Channel}) -> + State = #state{channel = Channel}) -> ?LOG(debug, "Handle outgoing publish: ~0p", [PubPkt]), ClientId = emqx_channel:info(clientid, Channel), - TopicId = emqx_sn_registry:lookup_topic_id(Registry, ClientId, TopicName), + TopicId = emqx_sn_registry:lookup_topic_id(ClientId, TopicName), case (TopicId == undefined) andalso (byte_size(TopicName) =/= 2) of true -> register_and_notify_client(PubPkt, State); false -> send_message(mqtt2sn(PubPkt, State), State) @@ -1106,11 +1100,11 @@ replay_no_reg_pending_publishes(TopicId, #state{pending_topic_ids = Pendings} = State#state{pending_topic_ids = maps:remove(TopicId, Pendings)}. register_and_notify_client(?PUBLISH_PACKET(QoS, TopicName, PacketId, Payload) = PubPkt, - State = #state{registry = Registry, pending_topic_ids = Pendings, channel = Channel}) -> + State = #state{pending_topic_ids = Pendings, channel = Channel}) -> MsgId = message_id(PacketId), #mqtt_packet{header = #mqtt_packet_header{dup = Dup, retain = Retain}} = PubPkt, ClientId = emqx_channel:info(clientid, Channel), - TopicId = emqx_sn_registry:register_topic(Registry, ClientId, TopicName), + TopicId = emqx_sn_registry:register_topic(ClientId, TopicName), ?LOG(debug, "Register TopicId=~p, TopicName=~p, Payload=~p, Dup=~p, QoS=~p, " "Retain=~p, MsgId=~p", [TopicId, TopicName, Payload, Dup, QoS, Retain, MsgId]), NewPendings = cache_no_reg_publish_message(Pendings, TopicId, PubPkt, State), diff --git a/apps/emqx_sn/src/emqx_sn_registry.erl b/apps/emqx_sn/src/emqx_sn_registry.erl index 53ea84107..4a3b22585 100644 --- a/apps/emqx_sn/src/emqx_sn_registry.erl +++ b/apps/emqx_sn/src/emqx_sn_registry.erl @@ -23,16 +23,16 @@ -define(LOG(Level, Format, Args), emqx_logger:Level("MQTT-SN(registry): " ++ Format, Args)). --export([ start_link/2 - , stop/1 +-export([ start_link/1 + , stop/0 ]). --export([ register_topic/3 - , unregister_topic/2 +-export([ register_topic/2 + , unregister_topic/1 ]). --export([ lookup_topic/3 - , lookup_topic_id/3 +-export([ lookup_topic/2 + , lookup_topic_id/2 ]). %% gen_server callbacks @@ -46,25 +46,45 @@ -define(TAB, ?MODULE). --record(state, {tab, max_predef_topic_id = 0}). +-record(state, {max_predef_topic_id = 0}). --type(registry() :: {ets:tab(), pid()}). +-record(emqx_sn_registry, {key, value}). + +%% Mnesia bootstrap +-export([mnesia/1]). + +-boot_mnesia({mnesia, [boot]}). +-copy_mnesia({mnesia, [copy]}). + + +%% @doc Create or replicate tables. +-spec(mnesia(boot | copy) -> ok). +mnesia(boot) -> + %% Optimize storage + StoreProps = [{ets, [{read_concurrency, true}]}], + ok = ekka_mnesia:create_table(?MODULE, [ + {attributes, record_info(fields, emqx_sn_registry)}, + {ram_copies, [node()]}, + {storage_properties, StoreProps}]); + +mnesia(copy) -> + ok = ekka_mnesia:copy_table(?MODULE, ram_copies). %%----------------------------------------------------------------------------- --spec(start_link(atom(), list()) -> {ok, pid()} | ignore | {error, Reason :: term()}). -start_link(Tab, PredefTopics) -> - gen_server:start_link(?MODULE, [Tab, PredefTopics], []). +-spec(start_link(list()) -> {ok, pid()} | ignore | {error, Reason :: term()}). +start_link(PredefTopics) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [PredefTopics], []). --spec(stop(registry()) -> ok). -stop({_Tab, Pid}) -> - gen_server:stop(Pid, normal, infinity). +-spec(stop() -> ok). +stop() -> + gen_server:stop(?MODULE, normal, infinity). --spec(register_topic(registry(), binary(), binary()) -> integer() | {error, term()}). -register_topic({_, Pid}, ClientId, TopicName) when is_binary(TopicName) -> +-spec(register_topic(binary(), binary()) -> integer() | {error, term()}). +register_topic(ClientId, TopicName) when is_binary(TopicName) -> case emqx_topic:wildcard(TopicName) of false -> - gen_server:call(Pid, {register, ClientId, TopicName}); + gen_server:call(?MODULE, {register, ClientId, TopicName}); %% TopicId: in case of “accepted” the value that will be used as topic %% id by the gateway when sending PUBLISH messages to the client (not %% relevant in case of subscriptions to a short topic name or to a topic @@ -72,22 +92,22 @@ register_topic({_, Pid}, ClientId, TopicName) when is_binary(TopicName) -> true -> {error, wildcard_topic} end. --spec(lookup_topic(registry(), binary(), pos_integer()) -> undefined | binary()). -lookup_topic({Tab, _Pid}, ClientId, TopicId) when is_integer(TopicId) -> - case lookup_element(Tab, {predef, TopicId}, 2) of +-spec(lookup_topic(binary(), pos_integer()) -> undefined | binary()). +lookup_topic(ClientId, TopicId) when is_integer(TopicId) -> + case lookup_element(?TAB, {predef, TopicId}, 3) of undefined -> - lookup_element(Tab, {ClientId, TopicId}, 2); + lookup_element(?TAB, {ClientId, TopicId}, 3); Topic -> Topic end. --spec(lookup_topic_id(registry(), binary(), binary()) +-spec(lookup_topic_id(binary(), binary()) -> undefined | pos_integer() | {predef, integer()}). -lookup_topic_id({Tab, _Pid}, ClientId, TopicName) when is_binary(TopicName) -> - case lookup_element(Tab, {predef, TopicName}, 2) of +lookup_topic_id(ClientId, TopicName) when is_binary(TopicName) -> + case lookup_element(?TAB, {predef, TopicName}, 3) of undefined -> - lookup_element(Tab, {ClientId, TopicName}, 2); + lookup_element(?TAB, {ClientId, TopicName}, 3); TopicId -> {predef, TopicId} end. @@ -96,47 +116,59 @@ lookup_topic_id({Tab, _Pid}, ClientId, TopicName) when is_binary(TopicName) -> lookup_element(Tab, Key, Pos) -> try ets:lookup_element(Tab, Key, Pos) catch error:badarg -> undefined end. --spec(unregister_topic(registry(), binary()) -> ok). -unregister_topic({_Tab, Pid}, ClientId) -> - gen_server:call(Pid, {unregister, ClientId}). +-spec(unregister_topic(binary()) -> ok). +unregister_topic(ClientId) -> + gen_server:call(?MODULE, {unregister, ClientId}). %%----------------------------------------------------------------------------- -init([Tab, PredefTopics]) -> +init([PredefTopics]) -> %% {predef, TopicId} -> TopicName %% {predef, TopicName} -> TopicId %% {ClientId, TopicId} -> TopicName %% {ClientId, TopicName} -> TopicId - _ = ets:new(Tab, [set, public, named_table, {read_concurrency, true}]), MaxPredefId = lists:foldl( fun({TopicId, TopicName}, AccId) -> - _ = ets:insert(Tab, {{predef, TopicId}, TopicName}), - _ = ets:insert(Tab, {{predef, TopicName}, TopicId}), + mnesia:dirty_write(#emqx_sn_registry{key = {predef, TopicId}, + value = TopicName}), + mnesia:dirty_write(#emqx_sn_registry{key = {predef, TopicName}, + value = TopicId}), if TopicId > AccId -> TopicId; true -> AccId end end, 0, PredefTopics), - {ok, #state{tab = Tab, max_predef_topic_id = MaxPredefId}}. + {ok, #state{max_predef_topic_id = MaxPredefId}}. handle_call({register, ClientId, TopicName}, _From, - State = #state{tab = Tab, max_predef_topic_id = PredefId}) -> - case lookup_topic_id({Tab, self()}, ClientId, TopicName) of + State = #state{max_predef_topic_id = PredefId}) -> + case lookup_topic_id(ClientId, TopicName) of {predef, PredefTopicId} when is_integer(PredefTopicId) -> {reply, PredefTopicId, State}; TopicId when is_integer(TopicId) -> {reply, TopicId, State}; undefined -> - case next_topic_id(Tab, PredefId, ClientId) of + case next_topic_id(?TAB, PredefId, ClientId) of TopicId when TopicId >= 16#FFFF -> {reply, {error, too_large}, State}; TopicId -> - _ = ets:insert(Tab, {{ClientId, next_topic_id}, TopicId + 1}), - _ = ets:insert(Tab, {{ClientId, TopicName}, TopicId}), - _ = ets:insert(Tab, {{ClientId, TopicId}, TopicName}), - {reply, TopicId, State} + Fun = fun() -> + mnesia:write(#emqx_sn_registry{key = {ClientId, next_topic_id}, + value = TopicId + 1}), + mnesia:write(#emqx_sn_registry{key = {ClientId, TopicName}, + value = TopicId}), + mnesia:write(#emqx_sn_registry{key = {ClientId, TopicId}, + value = TopicName}) + end, + case mnesia:transaction(Fun) of + {atomic, ok} -> + {reply, TopicId, State}; + {aborted, Error} -> + {reply, {error, Error}, State} + end end end; -handle_call({unregister, ClientId}, _From, State = #state{tab = Tab}) -> - ets:match_delete(Tab, {{ClientId, '_'}, '_'}), +handle_call({unregister, ClientId}, _From, State) -> + Registry = mnesia:dirty_match_object({?TAB, {ClientId, '_'}, '_'}), + lists:foreach(fun(R) -> mnesia:dirty_delete_object(R) end, Registry), {reply, ok, State}; handle_call(Req, _From, State) -> @@ -160,7 +192,7 @@ code_change(_OldVsn, State, _Extra) -> %%----------------------------------------------------------------------------- next_topic_id(Tab, PredefId, ClientId) -> - case ets:lookup(Tab, {ClientId, next_topic_id}) of - [{_, Id}] -> Id; + case mnesia:dirty_read(Tab, {ClientId, next_topic_id}) of + [#emqx_sn_registry{value = Id}] -> Id; [] -> PredefId + 1 end. diff --git a/apps/emqx_sn/src/emqx_sn_sup.erl b/apps/emqx_sn/src/emqx_sn_sup.erl index 817aa4d06..3d4fe602f 100644 --- a/apps/emqx_sn/src/emqx_sn_sup.erl +++ b/apps/emqx_sn/src/emqx_sn_sup.erl @@ -18,32 +18,26 @@ -behaviour(supervisor). --export([ start_link/2 - , start_registry_proc/3 +-export([ start_link/3 , init/1 ]). -start_registry_proc(Sup, TabName, PredefTopics) -> - Registry = #{id => TabName, - start => {emqx_sn_registry, start_link, [TabName, PredefTopics]}, - restart => permanent, - shutdown => 5000, - type => worker, - modules => [emqx_sn_registry]}, - handle_ret(supervisor:start_child(Sup, Registry)). +start_link(Addr, GwId, PredefTopics) -> + supervisor:start_link({local, ?MODULE}, ?MODULE, [Addr, GwId, PredefTopics]). -start_link(Addr, GwId) -> - supervisor:start_link({local, ?MODULE}, ?MODULE, [Addr, GwId]). - -init([{_Ip, Port}, GwId]) -> +init([{_Ip, Port}, GwId, PredefTopics]) -> Broadcast = #{id => emqx_sn_broadcast, start => {emqx_sn_broadcast, start_link, [GwId, Port]}, restart => permanent, shutdown => brutal_kill, type => worker, modules => [emqx_sn_broadcast]}, - {ok, {{one_for_one, 10, 3600}, [Broadcast]}}. + Registry = #{id => emqx_sn_registry, + start => {emqx_sn_registry, start_link, [PredefTopics]}, + restart => permanent, + shutdown => brutal_kill, + type => worker, + modules => [emqx_sn_registry]}, + {ok, {{one_for_one, 10, 3600}, [Broadcast, Registry]}}. -handle_ret({ok, Pid, _Info}) -> {ok, Pid}; -handle_ret(Ret) -> Ret. diff --git a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl index 2972571be..9a869663e 100644 --- a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl @@ -1084,7 +1084,7 @@ t_asleep_test03_to_awake_qos1_dl_msg(_) -> {ok, C} = emqtt:start_link(), {ok, _} = emqtt:connect(C), {ok, _} = emqtt:publish(C, TopicName1, Payload1, QoS), - timer:sleep(500), + timer:sleep(100), ok = emqtt:disconnect(C), timer:sleep(50), diff --git a/apps/emqx_sn/test/emqx_sn_registry_SUITE.erl b/apps/emqx_sn/test/emqx_sn_registry_SUITE.erl index 8cce4592a..8d320d8ed 100644 --- a/apps/emqx_sn/test/emqx_sn_registry_SUITE.erl +++ b/apps/emqx_sn/test/emqx_sn_registry_SUITE.erl @@ -16,12 +16,9 @@ -module(emqx_sn_registry_SUITE). --import(proplists, [get_value/2]). - -compile(export_all). -compile(nowarn_export_all). --include_lib("emqx_sn/include/emqx_sn.hrl"). -include_lib("eunit/include/eunit.hrl"). -define(REGISTRY, emqx_sn_registry). @@ -44,84 +41,81 @@ end_per_suite(_Config) -> ok. init_per_testcase(_TestCase, Config) -> + ekka_mnesia:start(), + emqx_sn_registry:mnesia(boot), + mnesia:clear_table(emqx_sn_registry), PredefTopics = application:get_env(emqx_sn, predefined, []), - TabName = emqx_sn_registry, - {ok, Pid} = ?REGISTRY:start_link(TabName, PredefTopics), - [{registray, {TabName, Pid}} | Config]. + {ok, _Pid} = ?REGISTRY:start_link(PredefTopics), + Config. end_per_testcase(_TestCase, Config) -> - ?REGISTRY:stop(get_value(registray, Config)), + ?REGISTRY:stop(), Config. %%-------------------------------------------------------------------- %% Test cases %%-------------------------------------------------------------------- -t_register(Config) -> - Registry = get_value(registray, Config), - ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(Registry, <<"ClientId">>, <<"Topic1">>)), - ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:register_topic(Registry, <<"ClientId">>, <<"Topic2">>)), - ?assertEqual(<<"Topic1">>, ?REGISTRY:lookup_topic(Registry, <<"ClientId">>, ?MAX_PREDEF_ID+1)), - ?assertEqual(<<"Topic2">>, ?REGISTRY:lookup_topic(Registry, <<"ClientId">>, ?MAX_PREDEF_ID+2)), - ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:lookup_topic_id(Registry, <<"ClientId">>, <<"Topic1">>)), - ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:lookup_topic_id(Registry, <<"ClientId">>, <<"Topic2">>)), - emqx_sn_registry:unregister_topic(Registry, <<"ClientId">>), - ?assertEqual(undefined, ?REGISTRY:lookup_topic(Registry, <<"ClientId">>, ?MAX_PREDEF_ID+1)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic(Registry, <<"ClientId">>, ?MAX_PREDEF_ID+2)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Registry, <<"ClientId">>, <<"Topic1">>)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Registry, <<"ClientId">>, <<"Topic2">>)). +t_register(_Config) -> + ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(<<"ClientId">>, <<"Topic1">>)), + ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:register_topic(<<"ClientId">>, <<"Topic2">>)), + ?assertEqual(<<"Topic1">>, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+1)), + ?assertEqual(<<"Topic2">>, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+2)), + ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic1">>)), + ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic2">>)), + emqx_sn_registry:unregister_topic(<<"ClientId">>), + ?assertEqual(undefined, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+1)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+2)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic1">>)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic2">>)). -t_register_case2(Config) -> - Registry = get_value(registray, Config), - ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(Registry, <<"ClientId">>, <<"Topic1">>)), - ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:register_topic(Registry, <<"ClientId">>, <<"Topic2">>)), - ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(Registry, <<"ClientId">>, <<"Topic1">>)), - ?assertEqual(<<"Topic1">>, ?REGISTRY:lookup_topic(Registry, <<"ClientId">>, ?MAX_PREDEF_ID+1)), - ?assertEqual(<<"Topic2">>, ?REGISTRY:lookup_topic(Registry, <<"ClientId">>, ?MAX_PREDEF_ID+2)), - ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:lookup_topic_id(Registry, <<"ClientId">>, <<"Topic1">>)), - ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:lookup_topic_id(Registry, <<"ClientId">>, <<"Topic2">>)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Registry, <<"ClientId">>, <<"Topic3">>)), - ?REGISTRY:unregister_topic(Registry, <<"ClientId">>), - ?assertEqual(undefined, ?REGISTRY:lookup_topic(Registry, <<"ClientId">>, ?MAX_PREDEF_ID+1)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic(Registry, <<"ClientId">>, ?MAX_PREDEF_ID+2)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Registry, <<"ClientId">>, <<"Topic1">>)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Registry, <<"ClientId">>, <<"Topic2">>)). +t_register_case2(_Config) -> + ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(<<"ClientId">>, <<"Topic1">>)), + ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:register_topic(<<"ClientId">>, <<"Topic2">>)), + ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(<<"ClientId">>, <<"Topic1">>)), + ?assertEqual(<<"Topic1">>, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+1)), + ?assertEqual(<<"Topic2">>, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+2)), + ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic1">>)), + ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic2">>)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic3">>)), + ?REGISTRY:unregister_topic(<<"ClientId">>), + ?assertEqual(undefined, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+1)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+2)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic1">>)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic2">>)). -t_reach_maximum(Config) -> - Registry = get_value(registray, Config), - register_a_lot(Registry, ?MAX_PREDEF_ID+1, 16#ffff), - ?assertEqual({error, too_large}, ?REGISTRY:register_topic(Registry, <<"ClientId">>, <<"TopicABC">>)), +t_reach_maximum(_Config) -> + register_a_lot(?MAX_PREDEF_ID+1, 16#ffff), + ?assertEqual({error, too_large}, ?REGISTRY:register_topic(<<"ClientId">>, <<"TopicABC">>)), Topic1 = iolist_to_binary(io_lib:format("Topic~p", [?MAX_PREDEF_ID+1])), Topic2 = iolist_to_binary(io_lib:format("Topic~p", [?MAX_PREDEF_ID+2])), - ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:lookup_topic_id(Registry, <<"ClientId">>, Topic1)), - ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:lookup_topic_id(Registry, <<"ClientId">>, Topic2)), - ?REGISTRY:unregister_topic(Registry, <<"ClientId">>), - ?assertEqual(undefined, ?REGISTRY:lookup_topic(Registry, <<"ClientId">>, ?MAX_PREDEF_ID+1)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic(Registry, <<"ClientId">>, ?MAX_PREDEF_ID+2)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Registry, <<"ClientId">>, Topic1)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Registry, <<"ClientId">>, Topic2)). + ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:lookup_topic_id(<<"ClientId">>, Topic1)), + ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:lookup_topic_id(<<"ClientId">>, Topic2)), + ?REGISTRY:unregister_topic(<<"ClientId">>), + ?assertEqual(undefined, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+1)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+2)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, Topic1)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, Topic2)). -t_register_case4(Config) -> - Registry = get_value(registray, Config), - ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(Registry, <<"ClientId">>, <<"TopicA">>)), - ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:register_topic(Registry, <<"ClientId">>, <<"TopicB">>)), - ?assertEqual(?MAX_PREDEF_ID+3, ?REGISTRY:register_topic(Registry, <<"ClientId">>, <<"TopicC">>)), - ?REGISTRY:unregister_topic(Registry, <<"ClientId">>), - ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(Registry, <<"ClientId">>, <<"TopicD">>)). +t_register_case4(_Config) -> + ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(<<"ClientId">>, <<"TopicA">>)), + ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:register_topic(<<"ClientId">>, <<"TopicB">>)), + ?assertEqual(?MAX_PREDEF_ID+3, ?REGISTRY:register_topic(<<"ClientId">>, <<"TopicC">>)), + ?REGISTRY:unregister_topic(<<"ClientId">>), + ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(<<"ClientId">>, <<"TopicD">>)). -t_deny_wildcard_topic(Config) -> - Registry = get_value(registray, Config), - ?assertEqual({error, wildcard_topic}, ?REGISTRY:register_topic(Registry, <<"ClientId">>, <<"/TopicA/#">>)), - ?assertEqual({error, wildcard_topic}, ?REGISTRY:register_topic(Registry, <<"ClientId">>, <<"/+/TopicB">>)). +t_deny_wildcard_topic(_Config) -> + ?assertEqual({error, wildcard_topic}, ?REGISTRY:register_topic(<<"ClientId">>, <<"/TopicA/#">>)), + ?assertEqual({error, wildcard_topic}, ?REGISTRY:register_topic(<<"ClientId">>, <<"/+/TopicB">>)). %%-------------------------------------------------------------------- %% Helper funcs %%-------------------------------------------------------------------- -register_a_lot(_, Max, Max) -> +register_a_lot(Max, Max) -> ok; -register_a_lot(Registry, N, Max) when N < Max -> +register_a_lot(N, Max) when N < Max -> Topic = iolist_to_binary(["Topic", integer_to_list(N)]), - ?assertEqual(N, ?REGISTRY:register_topic(Registry, <<"ClientId">>, Topic)), - register_a_lot(Registry, N+1, Max). + ?assertEqual(N, ?REGISTRY:register_topic(<<"ClientId">>, Topic)), + register_a_lot(N+1, Max).