emqx/src/emqx_topic.erl

222 lines
6.6 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_topic).
%% APIs
-export([ match/2
, validate/1
, validate/2
, levels/1
, tokens/1
, words/1
, wildcard/1
, join/1
, prepend/2
, feed_var/3
, systop/1
, parse/1
, parse/2
]).
-export_type([ group/0
, topic/0
, word/0
]).
-type(group() :: binary()).
-type(topic() :: binary()).
-type(word() :: '' | '+' | '#' | binary()).
-type(words() :: list(word())).
-define(MAX_TOPIC_LEN, 4096).
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
%% @doc Is wildcard topic?
-spec(wildcard(topic() | words()) -> true | false).
wildcard(Topic) when is_binary(Topic) ->
wildcard(words(Topic));
wildcard([]) ->
false;
wildcard(['#'|_]) ->
true;
wildcard(['+'|_]) ->
true;
wildcard([_H|T]) ->
wildcard(T).
%% @doc Match Topic name with filter.
-spec(match(Name, Filter) -> boolean() when
Name :: topic() | words(),
Filter :: topic() | words()).
match(<<$$, _/binary>>, <<$+, _/binary>>) ->
false;
match(<<$$, _/binary>>, <<$#, _/binary>>) ->
false;
match(Name, Filter) when is_binary(Name), is_binary(Filter) ->
match(words(Name), words(Filter));
match([], []) ->
true;
match([H|T1], [H|T2]) ->
match(T1, T2);
match([_H|T1], ['+'|T2]) ->
match(T1, T2);
match(_, ['#']) ->
true;
match([_H1|_], [_H2|_]) ->
false;
match([_H1|_], []) ->
false;
match([], [_H|_T2]) ->
false.
%% @doc Validate topic name or filter
-spec(validate(topic() | {name | filter, topic()}) -> true).
validate(Topic) when is_binary(Topic) ->
validate(filter, Topic);
validate({Type, Topic}) when Type =:= name; Type =:= filter ->
validate(Type, Topic).
-spec(validate(name | filter, topic()) -> true).
validate(_, <<>>) ->
error(empty_topic);
validate(_, Topic) when is_binary(Topic) andalso (size(Topic) > ?MAX_TOPIC_LEN) ->
error(topic_too_long);
validate(filter, Topic) when is_binary(Topic) ->
validate2(words(Topic));
validate(name, Topic) when is_binary(Topic) ->
Words = words(Topic),
validate2(Words)
andalso (not wildcard(Words))
orelse error(topic_name_error).
validate2([]) ->
true;
validate2(['#']) -> % end with '#'
true;
validate2(['#'|Words]) when length(Words) > 0 ->
error('topic_invalid_#');
validate2([''|Words]) ->
validate2(Words);
validate2(['+'|Words]) ->
validate2(Words);
validate2([W|Words]) ->
validate3(W) andalso validate2(Words).
validate3(<<>>) ->
true;
validate3(<<C/utf8, _Rest/binary>>) when C == $#; C == $+; C == 0 ->
error('topic_invalid_char');
validate3(<<_/utf8, Rest/binary>>) ->
validate3(Rest).
%% @doc Prepend a topic prefix.
%% Ensured to have only one / between prefix and suffix.
prepend(undefined, W) -> bin(W);
prepend(<<>>, W) -> bin(W);
prepend(Parent0, W) ->
Parent = bin(Parent0),
case binary:last(Parent) of
$/ -> <<Parent/binary, (bin(W))/binary>>;
_ -> <<Parent/binary, $/, (bin(W))/binary>>
end.
bin('') -> <<>>;
bin('+') -> <<"+">>;
bin('#') -> <<"#">>;
bin(B) when is_binary(B) -> B;
bin(L) when is_list(L) -> list_to_binary(L).
-spec(levels(topic()) -> pos_integer()).
levels(Topic) when is_binary(Topic) ->
length(tokens(Topic)).
-compile({inline, [tokens/1]}).
%% @doc Split topic to tokens.
-spec(tokens(topic()) -> list(binary())).
tokens(Topic) ->
binary:split(Topic, <<"/">>, [global]).
%% @doc Split Topic Path to Words
-spec(words(topic()) -> words()).
words(Topic) when is_binary(Topic) ->
[word(W) || W <- tokens(Topic)].
word(<<>>) -> '';
word(<<"+">>) -> '+';
word(<<"#">>) -> '#';
word(Bin) -> Bin.
%% @doc '$SYS' Topic.
-spec(systop(atom()|string()|binary()) -> topic()).
systop(Name) when is_atom(Name); is_list(Name) ->
iolist_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name]));
systop(Name) when is_binary(Name) ->
iolist_to_binary(["$SYS/brokers/", atom_to_list(node()), "/", Name]).
-spec(feed_var(binary(), binary(), binary()) -> binary()).
feed_var(Var, Val, Topic) ->
feed_var(Var, Val, words(Topic), []).
feed_var(_Var, _Val, [], Acc) ->
join(lists:reverse(Acc));
feed_var(Var, Val, [Var|Words], Acc) ->
feed_var(Var, Val, Words, [Val|Acc]);
feed_var(Var, Val, [W|Words], Acc) ->
feed_var(Var, Val, Words, [W|Acc]).
-spec(join(list(binary())) -> binary()).
join([]) ->
<<>>;
join([W]) ->
bin(W);
join(Words) ->
{_, Bin} = lists:foldr(
fun(W, {true, Tail}) ->
{false, <<W/binary, Tail/binary>>};
(W, {false, Tail}) ->
{false, <<W/binary, "/", Tail/binary>>}
end, {true, <<>>}, [bin(W) || W <- Words]),
Bin.
-spec(parse(topic() | {topic(), map()}) -> {topic(), #{share => binary()}}).
parse(TopicFilter) when is_binary(TopicFilter) ->
parse(TopicFilter, #{});
parse({TopicFilter, Options}) when is_binary(TopicFilter) ->
parse(TopicFilter, Options).
-spec(parse(topic(), map()) -> {topic(), map()}).
parse(TopicFilter = <<"$queue/", _/binary>>, #{share := _Group}) ->
error({invalid_topic_filter, TopicFilter});
parse(TopicFilter = <<"$share/", _/binary>>, #{share := _Group}) ->
error({invalid_topic_filter, TopicFilter});
parse(<<"$queue/", TopicFilter/binary>>, Options) ->
parse(TopicFilter, Options#{share => <<"$queue">>});
parse(TopicFilter = <<"$share/", Rest/binary>>, Options) ->
case binary:split(Rest, <<"/">>) of
[_Any] -> error({invalid_topic_filter, TopicFilter});
[ShareName, Filter] ->
case binary:match(ShareName, [<<"+">>, <<"#">>]) of
nomatch -> parse(Filter, Options#{share => ShareName});
_ -> error({invalid_topic_filter, TopicFilter})
end
end;
parse(TopicFilter, Options) ->
{TopicFilter, Options}.