emqx/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_registry.erl

177 lines
5.3 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc The MQTT-SN Topic Registry
-module(emqx_mqttsn_registry).
-include("emqx_mqttsn.hrl").
-export([
persist_predefined_topics/1,
clear_predefined_topics/1
]).
-export([
init/0,
reg/2,
unreg/2,
lookup_topic/2,
lookup_topic_id/2
]).
-export_type([registry/0]).
-define(PKEY(Id), {mqttsn, predef_topics, Id}).
-type registry() :: #{
%% The last topic id aallocated
last_topic_id := pos_integer(),
%% The mapping from topic id to topic name
id_to_name := map(),
%% The mapping from topic name to topic id
name_to_id := map()
}.
-type predef_topic() :: #{
id := 1..1024,
topic := iolist()
}.
%%-----------------------------------------------------------------------------
%% APIs
-spec persist_predefined_topics([predef_topic()]) -> ok.
persist_predefined_topics(PredefTopics) when is_list(PredefTopics) ->
try
F = fun(#{id := TopicId, topic := TopicName0}) when TopicId =< 1024 ->
TopicName = iolist_to_binary(TopicName0),
persistent_term:put(?PKEY(TopicId), TopicName),
persistent_term:put(?PKEY(TopicName), TopicId)
end,
lists:foreach(F, PredefTopics)
catch
_:_ ->
clear_predefined_topics(PredefTopics),
error(badarg)
end.
-spec clear_predefined_topics([predef_topic()]) -> ok.
clear_predefined_topics(PredefTopics) ->
lists:foreach(
fun(#{id := TopicId, topic := TopicName0}) ->
TopicName = iolist_to_binary(TopicName0),
persistent_term:erase(?PKEY(TopicId)),
persistent_term:erase(?PKEY(TopicName))
end,
PredefTopics
),
ok.
-spec init() -> registry().
init() ->
#{
last_topic_id => ?SN_MAX_PREDEF_TOPIC_ID,
id_to_name => #{},
name_to_id => #{}
}.
-spec reg(emqx_types:topic(), registry()) ->
{ok, integer(), registry()}
| {error, term()}.
reg(
TopicName,
Registry
) when is_binary(TopicName) ->
case emqx_topic:wildcard(TopicName) of
false ->
case lookup_topic_id(TopicName, Registry) of
{predef, TopicId} when is_integer(TopicId) ->
{ok, TopicId, Registry};
TopicId when is_integer(TopicId) ->
{ok, TopicId, Registry};
undefined ->
do_reg(TopicName, Registry)
end;
%% TopicId: in case of “accepted” the value that will be used as topic
%% id by the gateway when sending PUBLISH messages to the client (not
%% relevant in case of subscriptions to a short topic name or to a topic
%% name which contains wildcard characters)
true ->
{error, wildcard_topic}
end.
do_reg(
TopicName,
Registry = #{
last_topic_id := TopicId0,
id_to_name := IdMap,
name_to_id := NameMap
}
) ->
case next_topic_id(TopicId0) of
{error, too_large} ->
{error, too_large};
NextTopicId ->
NRegistry = Registry#{
last_topic_id := NextTopicId,
id_to_name := maps:put(NextTopicId, TopicName, IdMap),
name_to_id := maps:put(TopicName, NextTopicId, NameMap)
},
{ok, NextTopicId, NRegistry}
end.
next_topic_id(Id) when is_integer(Id) andalso (Id < 16#FFFF) ->
Id + 1;
next_topic_id(Id) when is_integer(Id) ->
{error, too_large}.
-spec lookup_topic(pos_integer(), registry()) ->
undefined
| binary().
lookup_topic(TopicId, _Registry = #{id_to_name := IdMap}) when is_integer(TopicId) ->
case persistent_term:get(?PKEY(TopicId), undefined) of
undefined ->
maps:get(TopicId, IdMap, undefined);
Topic ->
Topic
end.
-spec lookup_topic_id(emqx_types:topic(), registry()) ->
undefined
| pos_integer()
| {predef, integer()}.
lookup_topic_id(TopicName, _Registry = #{name_to_id := NameMap}) when is_binary(TopicName) ->
case persistent_term:get(?PKEY(TopicName), undefined) of
undefined ->
maps:get(TopicName, NameMap, undefined);
TopicId ->
{predef, TopicId}
end.
-spec unreg(emqx_types:topic(), registry()) -> registry().
unreg(TopicName, Registry = #{name_to_id := NameMap, id_to_name := IdMap}) when
is_binary(TopicName)
->
case maps:find(TopicName, NameMap) of
{ok, TopicId} ->
Registry#{
name_to_id := maps:remove(TopicName, NameMap),
id_to_name := maps:remove(TopicId, IdMap)
};
error ->
Registry
end.