%%-------------------------------------------------------------------- %% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_topic). %% APIs -export([ match/2 , validate/1 , validate/2 , levels/1 , tokens/1 , words/1 , wildcard/1 , join/1 , prepend/2 , feed_var/3 , systop/1 , parse/1 , parse/2 ]). -export_type([ group/0 , topic/0 , word/0 ]). -type(group() :: binary()). -type(topic() :: binary()). -type(word() :: '' | '+' | '#' | binary()). -type(words() :: list(word())). -define(MAX_TOPIC_LEN, 4096). %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- %% @doc Is wildcard topic? -spec(wildcard(topic() | words()) -> true | false). wildcard(Topic) when is_binary(Topic) -> wildcard(words(Topic)); wildcard([]) -> false; wildcard(['#'|_]) -> true; wildcard(['+'|_]) -> true; wildcard([_H|T]) -> wildcard(T). %% @doc Match Topic name with filter. -spec(match(Name, Filter) -> boolean() when Name :: topic() | words(), Filter :: topic() | words()). match(<<$$, _/binary>>, <<$+, _/binary>>) -> false; match(<<$$, _/binary>>, <<$#, _/binary>>) -> false; match(Name, Filter) when is_binary(Name), is_binary(Filter) -> match(words(Name), words(Filter)); match([], []) -> true; match([H|T1], [H|T2]) -> match(T1, T2); match([_H|T1], ['+'|T2]) -> match(T1, T2); match(_, ['#']) -> true; match([_H1|_], [_H2|_]) -> false; match([_H1|_], []) -> false; match([], [_H|_T2]) -> false. %% @doc Validate topic name or filter -spec(validate(topic() | {name | filter, topic()}) -> true). validate(Topic) when is_binary(Topic) -> validate(filter, Topic); validate({Type, Topic}) when Type =:= name; Type =:= filter -> validate(Type, Topic). -spec(validate(name | filter, topic()) -> true). validate(_, <<>>) -> error(empty_topic); validate(_, Topic) when is_binary(Topic) andalso (size(Topic) > ?MAX_TOPIC_LEN) -> error(topic_too_long); validate(filter, Topic) when is_binary(Topic) -> validate2(words(Topic)); validate(name, Topic) when is_binary(Topic) -> Words = words(Topic), validate2(Words) andalso (not wildcard(Words)) orelse error(topic_name_error). validate2([]) -> true; validate2(['#']) -> % end with '#' true; validate2(['#'|Words]) when length(Words) > 0 -> error('topic_invalid_#'); validate2([''|Words]) -> validate2(Words); validate2(['+'|Words]) -> validate2(Words); validate2([W|Words]) -> validate3(W) andalso validate2(Words). validate3(<<>>) -> true; validate3(<>) when C == $#; C == $+; C == 0 -> error('topic_invalid_char'); validate3(<<_/utf8, Rest/binary>>) -> validate3(Rest). %% @doc Prepend a topic prefix. %% Ensured to have only one / between prefix and suffix. prepend(undefined, W) -> bin(W); prepend(<<>>, W) -> bin(W); prepend(Parent0, W) -> Parent = bin(Parent0), case binary:last(Parent) of $/ -> <>; _ -> <> end. bin('') -> <<>>; bin('+') -> <<"+">>; bin('#') -> <<"#">>; bin(B) when is_binary(B) -> B; bin(L) when is_list(L) -> list_to_binary(L). -spec(levels(topic()) -> pos_integer()). levels(Topic) when is_binary(Topic) -> length(tokens(Topic)). -compile({inline, [tokens/1]}). %% @doc Split topic to tokens. -spec(tokens(topic()) -> list(binary())). tokens(Topic) -> binary:split(Topic, <<"/">>, [global]). %% @doc Split Topic Path to Words -spec(words(topic()) -> words()). words(Topic) when is_binary(Topic) -> [word(W) || W <- tokens(Topic)]. word(<<>>) -> ''; word(<<"+">>) -> '+'; word(<<"#">>) -> '#'; word(Bin) -> Bin. %% @doc '$SYS' Topic. -spec(systop(atom()|string()|binary()) -> topic()). systop(Name) when is_atom(Name); is_list(Name) -> iolist_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name])); systop(Name) when is_binary(Name) -> iolist_to_binary(["$SYS/brokers/", atom_to_list(node()), "/", Name]). -spec(feed_var(binary(), binary(), binary()) -> binary()). feed_var(Var, Val, Topic) -> feed_var(Var, Val, words(Topic), []). feed_var(_Var, _Val, [], Acc) -> join(lists:reverse(Acc)); feed_var(Var, Val, [Var|Words], Acc) -> feed_var(Var, Val, Words, [Val|Acc]); feed_var(Var, Val, [W|Words], Acc) -> feed_var(Var, Val, Words, [W|Acc]). -spec(join(list(binary())) -> binary()). join([]) -> <<>>; join([W]) -> bin(W); join(Words) -> {_, Bin} = lists:foldr( fun(W, {true, Tail}) -> {false, <>}; (W, {false, Tail}) -> {false, <>} end, {true, <<>>}, [bin(W) || W <- Words]), Bin. -spec(parse(topic() | {topic(), map()}) -> {topic(), #{share => binary()}}). parse(TopicFilter) when is_binary(TopicFilter) -> parse(TopicFilter, #{}); parse({TopicFilter, Options}) when is_binary(TopicFilter) -> parse(TopicFilter, Options). -spec(parse(topic(), map()) -> {topic(), map()}). parse(TopicFilter = <<"$queue/", _/binary>>, #{share := _Group}) -> error({invalid_topic_filter, TopicFilter}); parse(TopicFilter = <<"$share/", _/binary>>, #{share := _Group}) -> error({invalid_topic_filter, TopicFilter}); parse(<<"$queue/", TopicFilter/binary>>, Options) -> parse(TopicFilter, Options#{share => <<"$queue">>}); parse(TopicFilter = <<"$share/", Rest/binary>>, Options) -> case binary:split(Rest, <<"/">>) of [_Any] -> error({invalid_topic_filter, TopicFilter}); [ShareName, Filter] -> case binary:match(ShareName, [<<"+">>, <<"#">>]) of nomatch -> parse(Filter, Options#{share => ShareName}); _ -> error({invalid_topic_filter, TopicFilter}) end end; parse(TopicFilter, Options) -> {TopicFilter, Options}.