refactor(gw-sn): support mutil-registry process

This commit is contained in:
JianBo He 2021-07-07 18:29:47 +08:00
parent d2430e70a8
commit fc5baf8fd4
3 changed files with 142 additions and 99 deletions

View File

@ -76,6 +76,7 @@
}). }).
-record(state, {gwid :: integer(), -record(state, {gwid :: integer(),
registry :: emqx_sn_registry:registry(),
socket :: port(), socket :: port(),
sockpid :: pid(), sockpid :: pid(),
sockstate :: emqx_types:sockstate(), sockstate :: emqx_types:sockstate(),
@ -145,16 +146,18 @@ kick(GwPid) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([{_, SockPid, Sock}, Peername, Options]) -> init([{_, SockPid, Sock}, Peername, Options]) ->
GwId = proplists:get_value(gateway_id, Options), GwId = maps:get(gateway_id, Options),
Username = proplists:get_value(username, Options, undefined), Registry = maps:get(registry, Options),
Password = proplists:get_value(password, Options, undefined), Username = maps:get(username, Options, undefined),
EnableQos3 = proplists:get_value(enable_qos3, Options, false), Password = maps:get(password, Options, undefined),
IdleTimeout = proplists:get_value(idle_timeout, Options, 30000), EnableQos3 = maps:get(enable_qos3, Options, false),
EnableStats = proplists:get_value(enable_stats, Options, false), IdleTimeout = maps:get(idle_timeout, Options, 30000),
EnableStats = maps:get(enable_stats, Options, false),
case inet:sockname(Sock) of case inet:sockname(Sock) of
{ok, Sockname} -> {ok, Sockname} ->
Channel = emqx_channel:init(?CONN_INFO(Sockname, Peername), ?DEFAULT_CHAN_OPTIONS), Channel = emqx_channel:init(?CONN_INFO(Sockname, Peername), ?DEFAULT_CHAN_OPTIONS),
State = #state{gwid = GwId, State = #state{gwid = GwId,
registry = Registry,
username = Username, username = Username,
password = Password, password = Password,
socket = Sock, socket = Sock,
@ -202,10 +205,16 @@ idle(cast, {incoming, ?SN_PUBLISH_MSG(_Flag, _TopicId, _MsgId, _Data)}, State =
idle(cast, {incoming, ?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1, idle(cast, {incoming, ?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1,
topic_id_type = TopicIdType topic_id_type = TopicIdType
}, TopicId, _MsgId, Data)}, }, TopicId, _MsgId, Data)},
State = #state{clientid = ClientId}) -> State = #state{registry = Registry, clientid = ClientId}) ->
TopicName = case (TopicIdType =:= ?SN_SHORT_TOPIC) of TopicName = case (TopicIdType =:= ?SN_SHORT_TOPIC) of
false -> emqx_sn_registry:lookup_topic(ClientId, TopicId); false ->
true -> <<TopicId:16>> emqx_sn_registry:lookup_topic(
Registry,
ClientId,
TopicId
);
true ->
<<TopicId:16>>
end, end,
_ = case TopicName =/= undefined of _ = case TopicName =/= undefined of
true -> true ->
@ -290,9 +299,9 @@ wait_for_will_msg(EventType, EventContent, State) ->
handle_event(EventType, EventContent, wait_for_will_msg, State). handle_event(EventType, EventContent, wait_for_will_msg, State).
connected(cast, {incoming, ?SN_REGISTER_MSG(_TopicId, MsgId, TopicName)}, connected(cast, {incoming, ?SN_REGISTER_MSG(_TopicId, MsgId, TopicName)},
State = #state{clientid = ClientId}) -> State = #state{registry = Registry, clientid = ClientId}) ->
State0 = State0 =
case emqx_sn_registry:register_topic(ClientId, TopicName) of case emqx_sn_registry:register_topic(Registry, ClientId, TopicName) of
TopicId when is_integer(TopicId) -> TopicId when is_integer(TopicId) ->
?LOG(debug, "register ClientId=~p, TopicName=~p, TopicId=~p", [ClientId, TopicName, TopicId]), ?LOG(debug, "register ClientId=~p, TopicName=~p, TopicId=~p", [ClientId, TopicName, TopicId]),
send_message(?SN_REGACK_MSG(TopicId, MsgId, ?SN_RC_ACCEPTED), State); send_message(?SN_REGACK_MSG(TopicId, MsgId, ?SN_RC_ACCEPTED), State);
@ -579,13 +588,13 @@ handle_event(EventType, EventContent, StateName, State) ->
[StateName, {EventType, EventContent}]), [StateName, {EventType, EventContent}]),
{keep_state, State}. {keep_state, State}.
terminate(Reason, _StateName, #state{channel = Channel}) -> terminate(Reason, _StateName, #state{registry = Registry, channel = Channel}) ->
ClientId = emqx_channel:info(clientid, Channel), ClientId = emqx_channel:info(clientid, Channel),
case Reason of case Reason of
{shutdown, takeovered} -> {shutdown, takeovered} ->
ok; ok;
_ -> _ ->
emqx_sn_registry:unregister_topic(ClientId) emqx_sn_registry:unregister_topic(Registry, ClientId)
end, end,
emqx_channel:terminate(Reason, Channel), emqx_channel:terminate(Reason, Channel),
ok. ok.
@ -721,12 +730,13 @@ mqtt2sn(?PUBCOMP_PACKET(MsgId), _State) ->
mqtt2sn(?UNSUBACK_PACKET(MsgId), _State)-> mqtt2sn(?UNSUBACK_PACKET(MsgId), _State)->
?SN_UNSUBACK_MSG(MsgId); ?SN_UNSUBACK_MSG(MsgId);
mqtt2sn(?PUBLISH_PACKET(QoS, Topic, PacketId, Payload), #state{channel = Channel}) -> mqtt2sn(?PUBLISH_PACKET(QoS, Topic, PacketId, Payload),
#state{registry = Registry, channel = Channel}) ->
NewPacketId = if QoS =:= ?QOS_0 -> 0; NewPacketId = if QoS =:= ?QOS_0 -> 0;
true -> PacketId true -> PacketId
end, end,
ClientId = emqx_channel:info(clientid, Channel), ClientId = emqx_channel:info(clientid, Channel),
{TopicIdType, TopicContent} = case emqx_sn_registry:lookup_topic_id(ClientId, Topic) of {TopicIdType, TopicContent} = case emqx_sn_registry:lookup_topic_id(Registry, ClientId, Topic) of
{predef, PredefTopicId} -> {predef, PredefTopicId} ->
{?SN_PREDEFINED_TOPIC, PredefTopicId}; {?SN_PREDEFINED_TOPIC, PredefTopicId};
TopicId when is_integer(TopicId) -> TopicId when is_integer(TopicId) ->
@ -848,9 +858,9 @@ do_connect(ClientId, CleanStart, WillFlag, Duration, State) ->
end. end.
handle_subscribe(?SN_NORMAL_TOPIC, TopicName, QoS, MsgId, handle_subscribe(?SN_NORMAL_TOPIC, TopicName, QoS, MsgId,
State=#state{channel = Channel}) -> State=#state{registry = Registry, channel = Channel}) ->
ClientId = emqx_channel:info(clientid, Channel), ClientId = emqx_channel:info(clientid, Channel),
case emqx_sn_registry:register_topic(ClientId, TopicName) of case emqx_sn_registry:register_topic(Registry, ClientId, TopicName) of
{error, too_large} -> {error, too_large} ->
State0 = send_message(?SN_SUBACK_MSG(#mqtt_sn_flags{qos = QoS}, State0 = send_message(?SN_SUBACK_MSG(#mqtt_sn_flags{qos = QoS},
?SN_INVALID_TOPIC_ID, ?SN_INVALID_TOPIC_ID,
@ -864,9 +874,9 @@ handle_subscribe(?SN_NORMAL_TOPIC, TopicName, QoS, MsgId,
end; end;
handle_subscribe(?SN_PREDEFINED_TOPIC, TopicId, QoS, MsgId, handle_subscribe(?SN_PREDEFINED_TOPIC, TopicId, QoS, MsgId,
State = #state{channel = Channel}) -> State = #state{registry = Registry, channel = Channel}) ->
ClientId = emqx_channel:info(clientid, Channel), ClientId = emqx_channel:info(clientid, Channel),
case emqx_sn_registry:lookup_topic(ClientId, TopicId) of case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of
undefined -> undefined ->
State0 = send_message(?SN_SUBACK_MSG(#mqtt_sn_flags{qos = QoS}, State0 = send_message(?SN_SUBACK_MSG(#mqtt_sn_flags{qos = QoS},
TopicId, TopicId,
@ -895,9 +905,9 @@ handle_unsubscribe(?SN_NORMAL_TOPIC, TopicId, MsgId, State) ->
proto_unsubscribe(TopicId, MsgId, State); proto_unsubscribe(TopicId, MsgId, State);
handle_unsubscribe(?SN_PREDEFINED_TOPIC, TopicId, MsgId, handle_unsubscribe(?SN_PREDEFINED_TOPIC, TopicId, MsgId,
State = #state{channel = Channel}) -> State = #state{registry = Registry, channel = Channel}) ->
ClientId = emqx_channel:info(clientid, Channel), ClientId = emqx_channel:info(clientid, Channel),
case emqx_sn_registry:lookup_topic(ClientId, TopicId) of case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of
undefined -> undefined ->
{keep_state, send_message(?SN_UNSUBACK_MSG(MsgId), State)}; {keep_state, send_message(?SN_UNSUBACK_MSG(MsgId), State)};
PredefinedTopic -> PredefinedTopic ->
@ -920,11 +930,11 @@ do_publish(?SN_NORMAL_TOPIC, TopicName, Data, Flags, MsgId, State) ->
<<TopicId:16>> = TopicName, <<TopicId:16>> = TopicName,
do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId, State); do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId, State);
do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId, do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId,
State=#state{channel = Channel}) -> State=#state{registry = Registry, channel = Channel}) ->
#mqtt_sn_flags{qos = QoS, dup = Dup, retain = Retain} = Flags, #mqtt_sn_flags{qos = QoS, dup = Dup, retain = Retain} = Flags,
NewQoS = get_corrected_qos(QoS), NewQoS = get_corrected_qos(QoS),
ClientId = emqx_channel:info(clientid, Channel), ClientId = emqx_channel:info(clientid, Channel),
case emqx_sn_registry:lookup_topic(ClientId, TopicId) of case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of
undefined -> undefined ->
{keep_state, maybe_send_puback(NewQoS, TopicId, MsgId, ?SN_RC_INVALID_TOPIC_ID, {keep_state, maybe_send_puback(NewQoS, TopicId, MsgId, ?SN_RC_INVALID_TOPIC_ID,
State)}; State)};
@ -963,13 +973,13 @@ do_publish_will(#state{will_msg = WillMsg, clientid = ClientId}) ->
ok. ok.
do_puback(TopicId, MsgId, ReturnCode, StateName, do_puback(TopicId, MsgId, ReturnCode, StateName,
State=#state{channel = Channel}) -> State=#state{registry = Registry, channel = Channel}) ->
case ReturnCode of case ReturnCode of
?SN_RC_ACCEPTED -> ?SN_RC_ACCEPTED ->
handle_incoming(?PUBACK_PACKET(MsgId), StateName, State); handle_incoming(?PUBACK_PACKET(MsgId), StateName, State);
?SN_RC_INVALID_TOPIC_ID -> ?SN_RC_INVALID_TOPIC_ID ->
ClientId = emqx_channel:info(clientid, Channel), ClientId = emqx_channel:info(clientid, Channel),
case emqx_sn_registry:lookup_topic(ClientId, TopicId) of case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of
undefined -> {keep_state, State}; undefined -> {keep_state, State};
TopicName -> TopicName ->
%%notice that this TopicName maybe normal or predefined, %%notice that this TopicName maybe normal or predefined,
@ -1061,10 +1071,10 @@ handle_outgoing(Packets, State) when is_list(Packets) ->
end, State, Packets); end, State, Packets);
handle_outgoing(PubPkt = ?PUBLISH_PACKET(_, TopicName, _, _), handle_outgoing(PubPkt = ?PUBLISH_PACKET(_, TopicName, _, _),
State = #state{channel = Channel}) -> State = #state{registry = Registry, channel = Channel}) ->
?LOG(debug, "Handle outgoing publish: ~0p", [PubPkt]), ?LOG(debug, "Handle outgoing publish: ~0p", [PubPkt]),
ClientId = emqx_channel:info(clientid, Channel), ClientId = emqx_channel:info(clientid, Channel),
TopicId = emqx_sn_registry:lookup_topic_id(ClientId, TopicName), TopicId = emqx_sn_registry:lookup_topic_id(Registry, ClientId, TopicName),
case (TopicId == undefined) andalso (byte_size(TopicName) =/= 2) of case (TopicId == undefined) andalso (byte_size(TopicName) =/= 2) of
true -> register_and_notify_client(PubPkt, State); true -> register_and_notify_client(PubPkt, State);
false -> send_message(mqtt2sn(PubPkt, State), State) false -> send_message(mqtt2sn(PubPkt, State), State)
@ -1089,11 +1099,11 @@ replay_no_reg_pending_publishes(TopicId, #state{pending_topic_ids = Pendings} =
State#state{pending_topic_ids = maps:remove(TopicId, Pendings)}. State#state{pending_topic_ids = maps:remove(TopicId, Pendings)}.
register_and_notify_client(?PUBLISH_PACKET(QoS, TopicName, PacketId, Payload) = PubPkt, register_and_notify_client(?PUBLISH_PACKET(QoS, TopicName, PacketId, Payload) = PubPkt,
State = #state{pending_topic_ids = Pendings, channel = Channel}) -> State = #state{registry = Registry, pending_topic_ids = Pendings, channel = Channel}) ->
MsgId = message_id(PacketId), MsgId = message_id(PacketId),
#mqtt_packet{header = #mqtt_packet_header{dup = Dup, retain = Retain}} = PubPkt, #mqtt_packet{header = #mqtt_packet_header{dup = Dup, retain = Retain}} = PubPkt,
ClientId = emqx_channel:info(clientid, Channel), ClientId = emqx_channel:info(clientid, Channel),
TopicId = emqx_sn_registry:register_topic(ClientId, TopicName), TopicId = emqx_sn_registry:register_topic(Registry, ClientId, TopicName),
?LOG(debug, "Register TopicId=~p, TopicName=~p, Payload=~p, Dup=~p, QoS=~p, " ?LOG(debug, "Register TopicId=~p, TopicName=~p, Payload=~p, Dup=~p, QoS=~p, "
"Retain=~p, MsgId=~p", [TopicId, TopicName, Payload, Dup, QoS, Retain, MsgId]), "Retain=~p, MsgId=~p", [TopicId, TopicName, Payload, Dup, QoS, Retain, MsgId]),
NewPendings = cache_no_reg_publish_message(Pendings, TopicId, PubPkt, State), NewPendings = cache_no_reg_publish_message(Pendings, TopicId, PubPkt, State),

View File

@ -72,11 +72,11 @@ on_insta_create(_Insta = #{ id := InstaId,
end, end,
PredefTopics = maps:get(predefined, RawConf), PredefTopics = maps:get(predefined, RawConf),
{ok, RegistrySvr} = emqx_sn_registry:start_link(PredefTopics), {ok, RegistrySvr} = emqx_sn_registry:start_link(InstaId, PredefTopics),
NRawConf = maps:without( NRawConf = maps:without(
[gateway_id, broadcast, predefined], [broadcast, predefined],
RawConf#{registry => RegistrySvr} RawConf#{registry => emqx_sn_registry:lookup_name(RegistrySvr)}
), ),
Listeners = emqx_gateway_utils:normalize_rawconf(NRawConf), Listeners = emqx_gateway_utils:normalize_rawconf(NRawConf),

View File

@ -14,6 +14,9 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc The MQTT-SN Topic Registry
%%
%% XXX:
-module(emqx_sn_registry). -module(emqx_sn_registry).
-behaviour(gen_server). -behaviour(gen_server).
@ -23,16 +26,15 @@
-define(LOG(Level, Format, Args), -define(LOG(Level, Format, Args),
emqx_logger:Level("MQTT-SN(registry): " ++ Format, Args)). emqx_logger:Level("MQTT-SN(registry): " ++ Format, Args)).
-export([ start_link/1 -export([ start_link/2
, stop/0
]). ]).
-export([ register_topic/2 -export([ register_topic/3
, unregister_topic/1 , unregister_topic/2
]). ]).
-export([ lookup_topic/2 -export([ lookup_topic/3
, lookup_topic_id/2 , lookup_topic_id/3
]). ]).
%% gen_server callbacks %% gen_server callbacks
@ -44,51 +46,54 @@
, code_change/3 , code_change/3
]). ]).
-export([lookup_name/1]).
-define(SN_SHARD, emqx_sn_shard). -define(SN_SHARD, emqx_sn_shard).
-define(TAB, ?MODULE). -record(state, {tabname, max_predef_topic_id = 0}).
-record(state, {max_predef_topic_id = 0}).
-record(emqx_sn_registry, {key, value}). -record(emqx_sn_registry, {key, value}).
%% Mnesia bootstrap %% Mnesia bootstrap
-export([mnesia/1]). %-export([mnesia/1]).
-boot_mnesia({mnesia, [boot]}). %-boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}). %-copy_mnesia({mnesia, [copy]}).
-rlog_shard({?SN_SHARD, ?TAB}). %-rlog_shard({?SN_SHARD, ?TAB}).
%% @doc Create or replicate tables. %%% @doc Create or replicate tables.
-spec(mnesia(boot | copy) -> ok). %-spec(mnesia(boot | copy) -> ok).
mnesia(boot) -> %mnesia(boot) ->
%% Optimize storage % %% Optimize storage
StoreProps = [{ets, [{read_concurrency, true}]}], % StoreProps = [{ets, [{read_concurrency, true}]}],
ok = ekka_mnesia:create_table(?MODULE, [ % ok = ekka_mnesia:create_table(?MODULE, [
{attributes, record_info(fields, emqx_sn_registry)}, % {attributes, record_info(fields, emqx_sn_registry)},
{ram_copies, [node()]}, % {ram_copies, [node()]},
{storage_properties, StoreProps}]); % {storage_properties, StoreProps}]);
%
%mnesia(copy) ->
% ok = ekka_mnesia:copy_table(?MODULE, ram_copies).
mnesia(copy) -> -type registry() :: {Tab :: atom(),
ok = ekka_mnesia:copy_table(?MODULE, ram_copies). RegistryPid :: pid()}.
%%----------------------------------------------------------------------------- %%-----------------------------------------------------------------------------
-spec(start_link(list()) -> {ok, pid()} | ignore | {error, Reason :: term()}). -spec start_link(atom(), list())
start_link(PredefTopics) -> -> ignore
ekka_rlog:wait_for_shards([?SN_SHARD], infinity), | {ok, pid()}
gen_server:start_link({local, ?MODULE}, ?MODULE, [PredefTopics], []). | {error, Reason :: term()}.
start_link(InstaId, PredefTopics) ->
gen_server:start_link(?MODULE, [InstaId, PredefTopics], []).
-spec(stop() -> ok). -spec register_topic(registry(), emqx_types:clientid(), emqx_types:topic())
stop() -> -> integer()
gen_server:stop(?MODULE, normal, infinity). | {error, term()}.
register_topic({_, Pid}, ClientId, TopicName) when is_binary(TopicName) ->
-spec(register_topic(binary(), binary()) -> integer() | {error, term()}).
register_topic(ClientId, TopicName) when is_binary(TopicName) ->
case emqx_topic:wildcard(TopicName) of case emqx_topic:wildcard(TopicName) of
false -> false ->
gen_server:call(?MODULE, {register, ClientId, TopicName}); gen_server:call(Pid, {register, ClientId, TopicName});
%% TopicId: in case of accepted the value that will be used as topic %% TopicId: in case of accepted the value that will be used as topic
%% id by the gateway when sending PUBLISH messages to the client (not %% id by the gateway when sending PUBLISH messages to the client (not
%% relevant in case of subscriptions to a short topic name or to a topic %% relevant in case of subscriptions to a short topic name or to a topic
@ -96,22 +101,24 @@ register_topic(ClientId, TopicName) when is_binary(TopicName) ->
true -> {error, wildcard_topic} true -> {error, wildcard_topic}
end. end.
-spec(lookup_topic(binary(), pos_integer()) -> undefined | binary()). -spec lookup_topic(registry(), emqx_types:clientid(), pos_integer())
lookup_topic(ClientId, TopicId) when is_integer(TopicId) -> -> undefined
case lookup_element(?TAB, {predef, TopicId}, 3) of | binary().
lookup_topic({Tab, _}, ClientId, TopicId) when is_integer(TopicId) ->
case lookup_element(Tab, {predef, TopicId}, 3) of
undefined -> undefined ->
lookup_element(?TAB, {ClientId, TopicId}, 3); lookup_element(Tab, {ClientId, TopicId}, 3);
Topic -> Topic Topic -> Topic
end. end.
-spec(lookup_topic_id(binary(), binary()) -spec lookup_topic_id(registry(), emqx_types:clientid(), emqx_types:topic())
-> undefined -> undefined
| pos_integer() | pos_integer()
| {predef, integer()}). | {predef, integer()}.
lookup_topic_id(ClientId, TopicName) when is_binary(TopicName) -> lookup_topic_id({Tab, _}, ClientId, TopicName) when is_binary(TopicName) ->
case lookup_element(?TAB, {predef, TopicName}, 3) of case lookup_element(Tab, {predef, TopicName}, 3) of
undefined -> undefined ->
lookup_element(?TAB, {ClientId, TopicName}, 3); lookup_element(Tab, {ClientId, TopicName}, 3);
TopicId -> TopicId ->
{predef, TopicId} {predef, TopicId}
end. end.
@ -120,46 +127,69 @@ lookup_topic_id(ClientId, TopicName) when is_binary(TopicName) ->
lookup_element(Tab, Key, Pos) -> lookup_element(Tab, Key, Pos) ->
try ets:lookup_element(Tab, Key, Pos) catch error:badarg -> undefined end. try ets:lookup_element(Tab, Key, Pos) catch error:badarg -> undefined end.
-spec(unregister_topic(binary()) -> ok). -spec unregister_topic(registry(), emqx_types:clientid()) -> ok.
unregister_topic(ClientId) -> unregister_topic({_, Pid}, ClientId) ->
gen_server:call(?MODULE, {unregister, ClientId}). gen_server:call(Pid, {unregister, ClientId}).
lookup_name(Pid) ->
gen_server:call(Pid, name).
%%----------------------------------------------------------------------------- %%-----------------------------------------------------------------------------
init([PredefTopics]) -> name(InstaId) ->
list_to_atom(lists:concat([emqx_sn_, InstaId, '_registry'])).
init([InstaId, PredefTopics]) ->
%% {predef, TopicId} -> TopicName %% {predef, TopicId} -> TopicName
%% {predef, TopicName} -> TopicId %% {predef, TopicName} -> TopicId
%% {ClientId, TopicId} -> TopicName %% {ClientId, TopicId} -> TopicName
%% {ClientId, TopicName} -> TopicId %% {ClientId, TopicName} -> TopicId
Tab = name(InstaId),
ok = ekka_mnesia:create_table(Tab, [
{ram_copies, [node()]},
{record_name, emqx_sn_registry},
{attributes, record_info(fields, emqx_sn_registry)},
{storage_properties, [{ets, [{read_concurrency, true}]}]}
]),
ok = ekka_mnesia:copy_table(Tab, ram_copies),
% FIXME:
%ok = ekka_rlog:wait_for_shards([?CM_SHARD], infinity),
MaxPredefId = lists:foldl( MaxPredefId = lists:foldl(
fun(#{id := TopicId, topic := TopicName}, AccId) -> fun(#{id := TopicId, topic := TopicName}, AccId) ->
ekka_mnesia:dirty_write(#emqx_sn_registry{key = {predef, TopicId}, ekka_mnesia:dirty_write(Tab, #emqx_sn_registry{
value = TopicName}), key = {predef, TopicId},
ekka_mnesia:dirty_write(#emqx_sn_registry{key = {predef, TopicName}, value = TopicName}
value = TopicId}), ),
ekka_mnesia:dirty_write(Tab, #emqx_sn_registry{
key = {predef, TopicName},
value = TopicId}
),
if TopicId > AccId -> TopicId; true -> AccId end if TopicId > AccId -> TopicId; true -> AccId end
end, 0, PredefTopics), end, 0, PredefTopics),
{ok, #state{max_predef_topic_id = MaxPredefId}}. {ok, #state{tabname = Tab, max_predef_topic_id = MaxPredefId}}.
handle_call({register, ClientId, TopicName}, _From, handle_call({register, ClientId, TopicName}, _From,
State = #state{max_predef_topic_id = PredefId}) -> State = #state{tabname = Tab, max_predef_topic_id = PredefId}) ->
case lookup_topic_id(ClientId, TopicName) of case lookup_topic_id({Tab, self()}, ClientId, TopicName) of
{predef, PredefTopicId} when is_integer(PredefTopicId) -> {predef, PredefTopicId} when is_integer(PredefTopicId) ->
{reply, PredefTopicId, State}; {reply, PredefTopicId, State};
TopicId when is_integer(TopicId) -> TopicId when is_integer(TopicId) ->
{reply, TopicId, State}; {reply, TopicId, State};
undefined -> undefined ->
case next_topic_id(?TAB, PredefId, ClientId) of case next_topic_id(Tab, PredefId, ClientId) of
TopicId when TopicId >= 16#FFFF -> TopicId when TopicId >= 16#FFFF ->
{reply, {error, too_large}, State}; {reply, {error, too_large}, State};
TopicId -> TopicId ->
Fun = fun() -> Fun = fun() ->
mnesia:write(#emqx_sn_registry{key = {ClientId, next_topic_id}, mnesia:write(Tab, #emqx_sn_registry{
value = TopicId + 1}), key = {ClientId, next_topic_id},
mnesia:write(#emqx_sn_registry{key = {ClientId, TopicName}, value = TopicId + 1}, write),
value = TopicId}), mnesia:write(Tab, #emqx_sn_registry{
mnesia:write(#emqx_sn_registry{key = {ClientId, TopicId}, key = {ClientId, TopicName},
value = TopicName}) value = TopicId}, write),
mnesia:write(Tab, #emqx_sn_registry{
key = {ClientId, TopicId},
value = TopicName}, write)
end, end,
case ekka_mnesia:transaction(?SN_SHARD, Fun) of case ekka_mnesia:transaction(?SN_SHARD, Fun) of
{atomic, ok} -> {atomic, ok} ->
@ -170,11 +200,14 @@ handle_call({register, ClientId, TopicName}, _From,
end end
end; end;
handle_call({unregister, ClientId}, _From, State) -> handle_call({unregister, ClientId}, _From, State = #state{tabname = Tab}) ->
Registry = mnesia:dirty_match_object({?TAB, {ClientId, '_'}, '_'}), Registry = mnesia:dirty_match_object({Tab, {ClientId, '_'}, '_'}),
lists:foreach(fun(R) -> ekka_mnesia:dirty_delete_object(?TAB, R) end, Registry), lists:foreach(fun(R) -> ekka_mnesia:dirty_delete_object(Tab, R) end, Registry),
{reply, ok, State}; {reply, ok, State};
handle_call(name, _From, State = #state{tabname = Tab}) ->
{reply, {Tab, self()}, State};
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?LOG(error, "Unexpected request: ~p", [Req]), ?LOG(error, "Unexpected request: ~p", [Req]),
{reply, ignored, State}. {reply, ignored, State}.