diff --git a/src/emqx_mountpoint.erl b/src/emqx_mountpoint.erl index 66b8da057..d3bfb6caa 100644 --- a/src/emqx_mountpoint.erl +++ b/src/emqx_mountpoint.erl @@ -15,9 +15,6 @@ -module(emqx_mountpoint). -include("emqx.hrl"). --include("logger.hrl"). - --logger_header("[Mountpoint]"). -export([ mount/2 , unmount/2 @@ -32,30 +29,34 @@ %%------------------------------------------------------------------------------ %% APIs %%------------------------------------------------------------------------------ - mount(undefined, Any) -> Any; +mount(MountPoint, Topic) when is_binary(Topic) -> + prefix(MountPoint, Topic); mount(MountPoint, Msg = #message{topic = Topic}) -> - Msg#message{topic = <>}; - + Msg#message{topic = prefix(MountPoint, Topic)}; mount(MountPoint, TopicFilters) when is_list(TopicFilters) -> - [{<>, SubOpts} || {Topic, SubOpts} <- TopicFilters]. + [{prefix(MountPoint, Topic), SubOpts} || {Topic, SubOpts} <- TopicFilters]. -unmount(undefined, Msg) -> - Msg; +unmount(undefined, Any) -> + Any; +unmount(MountPoint, Topic) when is_binary(Topic) -> + case string:prefix(Topic, MountPoint) of + nomatch -> Topic; + Topic1 -> Topic1 + end; unmount(MountPoint, Msg = #message{topic = Topic}) -> - try split_binary(Topic, byte_size(MountPoint)) of - {MountPoint, Topic1} -> Msg#message{topic = Topic1} - catch - _Error:Reason -> - ?LOG(error, "Unmount error : ~p", [Reason]), - Msg + case string:prefix(Topic, MountPoint) of + nomatch -> Msg; + Topic1 -> Msg#message{topic = Topic1} end. +-spec(replvar(maybe(mountpoint()), map()) -> maybe(mountpoint())). replvar(undefined, _Vars) -> undefined; replvar(MountPoint, #{client_id := ClientId, username := Username}) -> - lists:foldl(fun feed_var/2, MountPoint, [{<<"%c">>, ClientId}, {<<"%u">>, Username}]). + lists:foldl(fun feed_var/2, MountPoint, + [{<<"%c">>, ClientId}, {<<"%u">>, Username}]). feed_var({<<"%c">>, ClientId}, MountPoint) -> emqx_topic:feed_var(<<"%c">>, ClientId, MountPoint); @@ -64,3 +65,5 @@ feed_var({<<"%u">>, undefined}, MountPoint) -> feed_var({<<"%u">>, Username}, MountPoint) -> emqx_topic:feed_var(<<"%u">>, Username, MountPoint). +prefix(MountPoint, Topic) -> + <>. \ No newline at end of file diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index b88c82df6..6ecea634e 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -1027,7 +1027,7 @@ raw_topic_filters(#pstate{zone = Zone, proto_ver = ProtoVer, is_bridge = IsBridg end. mountpoint(Credentials) -> - maps:get(mountpoint, Credentials, undefined). + emqx_mountpoint:replvar(maps:get(mountpoint, Credentials, undefined), Credentials). do_check_banned(_EnableBan = true, Credentials) -> case emqx_banned:check(Credentials) of diff --git a/src/emqx_topic.erl b/src/emqx_topic.erl index f1d15d8e2..5ea22ea13 100644 --- a/src/emqx_topic.erl +++ b/src/emqx_topic.erl @@ -14,8 +14,6 @@ -module(emqx_topic). --include("emqx_mqtt.hrl"). - %% APIs -export([ match/2 , validate/1 @@ -33,19 +31,23 @@ , parse/2 ]). +-export_type([ group/0 + , topic/0 + , word/0 + , triple/0 + ]). + -type(group() :: binary()). -type(topic() :: binary()). -type(word() :: '' | '+' | '#' | binary()). -type(words() :: list(word())). -opaque(triple() :: {root | binary(), word(), binary()}). --export_type([group/0, topic/0, word/0, triple/0]). - -define(MAX_TOPIC_LEN, 4096). -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% APIs -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% @doc Is wildcard topic? -spec(wildcard(topic() | words()) -> true | false). @@ -60,7 +62,7 @@ wildcard(['+'|_]) -> wildcard([_H|T]) -> wildcard(T). -%% @doc Match Topic name with filter +%% @doc Match Topic name with filter. -spec(match(Name, Filter) -> boolean() when Name :: topic() | words(), Filter :: topic() | words()). @@ -68,7 +70,7 @@ match(<<$$, _/binary>>, <<$+, _/binary>>) -> false; match(<<$$, _/binary>>, <<$#, _/binary>>) -> false; -match(Name, Filter) when is_binary(Name) and is_binary(Filter) -> +match(Name, Filter) when is_binary(Name), is_binary(Filter) -> match(words(Name), words(Filter)); match([], []) -> true; @@ -95,13 +97,15 @@ validate({Type, Topic}) when Type =:= name; Type =:= filter -> -spec(validate(name | filter, topic()) -> true). validate(_, <<>>) -> error(empty_topic); -validate(_, Topic) when is_binary(Topic) and (size(Topic) > ?MAX_TOPIC_LEN) -> +validate(_, Topic) when is_binary(Topic) andalso (size(Topic) > ?MAX_TOPIC_LEN) -> error(topic_too_long); validate(filter, Topic) when is_binary(Topic) -> validate2(words(Topic)); validate(name, Topic) when is_binary(Topic) -> Words = words(Topic), - validate2(Words) and (not wildcard(Words)). + validate2(Words) + andalso (not wildcard(Words)) + orelse error(topic_name_error). validate2([]) -> true; @@ -123,7 +127,7 @@ validate3(<>) when C == $#; C == $+; C == 0 -> validate3(<<_/utf8, Rest/binary>>) -> validate3(Rest). -%% @doc Topic to triples +%% @doc Topic to triples. -spec(triples(topic()) -> list(triple())). triples(Topic) when is_binary(Topic) -> triples(words(Topic), root, []). @@ -206,27 +210,27 @@ join(Words) -> end, {true, <<>>}, [bin(W) || W <- Words]), Bin. --spec(parse(topic()) -> {topic(), #{}}). -parse(Topic) when is_binary(Topic) -> - parse(Topic, #{}). +-spec(parse(topic() | {topic(), map()}) -> {topic(), #{share => binary()}}). +parse(TopicFilter) when is_binary(TopicFilter) -> + parse(TopicFilter, #{}); +parse({TopicFilter, Options}) when is_binary(TopicFilter) -> + parse(TopicFilter, Options). -parse(Topic = <<"$queue/", _/binary>>, #{share := _Group}) -> - error({invalid_topic, Topic}); -parse(Topic = <>, #{share := _Group}) -> - error({invalid_topic, Topic}); -parse(<<"$queue/", Topic1/binary>>, Options) -> - parse(Topic1, maps:put(share, <<"$queue">>, Options)); -parse(Topic = <>, Options) -> - case binary:split(Topic1, <<"/">>) of - [<<>>] -> error({invalid_topic, Topic}); - [_] -> error({invalid_topic, Topic}); - [Group, Topic2] -> - case binary:match(Group, [<<"/">>, <<"+">>, <<"#">>]) of - nomatch -> {Topic2, maps:put(share, Group, Options)}; - _ -> error({invalid_topic, Topic}) +-spec(parse(topic(), map()) -> {topic(), map()}). +parse(TopicFilter = <<"$queue/", _/binary>>, #{share := _Group}) -> + error({invalid_topic_filter, TopicFilter}); +parse(TopicFilter = <<"$share/", _/binary>>, #{share := _Group}) -> + error({invalid_topic_filter, TopicFilter}); +parse(<<"$queue/", TopicFilter/binary>>, Options) -> + parse(TopicFilter, Options#{share => <<"$queue">>}); +parse(TopicFilter = <<"$share/", Rest/binary>>, Options) -> + case binary:split(Rest, <<"/">>) of + [_Any] -> error({invalid_topic_filter, TopicFilter}); + [ShareName, Filter] -> + case binary:match(ShareName, [<<"+">>, <<"#">>]) of + nomatch -> parse(Filter, Options#{share => ShareName}); + _ -> error({invalid_topic_filter, TopicFilter}) end end; -parse(Topic, Options = #{qos := QoS}) -> - {Topic, Options#{rc => QoS}}; -parse(Topic, Options) -> - {Topic, Options}. +parse(TopicFilter, Options) -> + {TopicFilter, Options}. \ No newline at end of file