Fix unmount crash
This commit is contained in:
parent
be2ce93a2c
commit
997b693400
|
@ -15,9 +15,6 @@
|
|||
-module(emqx_mountpoint).
|
||||
|
||||
-include("emqx.hrl").
|
||||
-include("logger.hrl").
|
||||
|
||||
-logger_header("[Mountpoint]").
|
||||
|
||||
-export([ mount/2
|
||||
, unmount/2
|
||||
|
@ -32,30 +29,34 @@
|
|||
%%------------------------------------------------------------------------------
|
||||
%% APIs
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
mount(undefined, Any) ->
|
||||
Any;
|
||||
mount(MountPoint, Topic) when is_binary(Topic) ->
|
||||
prefix(MountPoint, Topic);
|
||||
mount(MountPoint, Msg = #message{topic = Topic}) ->
|
||||
Msg#message{topic = <<MountPoint/binary, Topic/binary>>};
|
||||
|
||||
Msg#message{topic = prefix(MountPoint, Topic)};
|
||||
mount(MountPoint, TopicFilters) when is_list(TopicFilters) ->
|
||||
[{<<MountPoint/binary, Topic/binary>>, SubOpts} || {Topic, SubOpts} <- TopicFilters].
|
||||
[{prefix(MountPoint, Topic), SubOpts} || {Topic, SubOpts} <- TopicFilters].
|
||||
|
||||
unmount(undefined, Msg) ->
|
||||
Msg;
|
||||
unmount(undefined, Any) ->
|
||||
Any;
|
||||
unmount(MountPoint, Topic) when is_binary(Topic) ->
|
||||
case string:prefix(Topic, MountPoint) of
|
||||
nomatch -> Topic;
|
||||
Topic1 -> Topic1
|
||||
end;
|
||||
unmount(MountPoint, Msg = #message{topic = Topic}) ->
|
||||
try split_binary(Topic, byte_size(MountPoint)) of
|
||||
{MountPoint, Topic1} -> Msg#message{topic = Topic1}
|
||||
catch
|
||||
_Error:Reason ->
|
||||
?LOG(error, "Unmount error : ~p", [Reason]),
|
||||
Msg
|
||||
case string:prefix(Topic, MountPoint) of
|
||||
nomatch -> Msg;
|
||||
Topic1 -> Msg#message{topic = Topic1}
|
||||
end.
|
||||
|
||||
-spec(replvar(maybe(mountpoint()), map()) -> maybe(mountpoint())).
|
||||
replvar(undefined, _Vars) ->
|
||||
undefined;
|
||||
replvar(MountPoint, #{client_id := ClientId, username := Username}) ->
|
||||
lists:foldl(fun feed_var/2, MountPoint, [{<<"%c">>, ClientId}, {<<"%u">>, Username}]).
|
||||
lists:foldl(fun feed_var/2, MountPoint,
|
||||
[{<<"%c">>, ClientId}, {<<"%u">>, Username}]).
|
||||
|
||||
feed_var({<<"%c">>, ClientId}, MountPoint) ->
|
||||
emqx_topic:feed_var(<<"%c">>, ClientId, MountPoint);
|
||||
|
@ -64,3 +65,5 @@ feed_var({<<"%u">>, undefined}, MountPoint) ->
|
|||
feed_var({<<"%u">>, Username}, MountPoint) ->
|
||||
emqx_topic:feed_var(<<"%u">>, Username, MountPoint).
|
||||
|
||||
prefix(MountPoint, Topic) ->
|
||||
<<MountPoint/binary, Topic/binary>>.
|
|
@ -1027,7 +1027,7 @@ raw_topic_filters(#pstate{zone = Zone, proto_ver = ProtoVer, is_bridge = IsBridg
|
|||
end.
|
||||
|
||||
mountpoint(Credentials) ->
|
||||
maps:get(mountpoint, Credentials, undefined).
|
||||
emqx_mountpoint:replvar(maps:get(mountpoint, Credentials, undefined), Credentials).
|
||||
|
||||
do_check_banned(_EnableBan = true, Credentials) ->
|
||||
case emqx_banned:check(Credentials) of
|
||||
|
|
|
@ -14,8 +14,6 @@
|
|||
|
||||
-module(emqx_topic).
|
||||
|
||||
-include("emqx_mqtt.hrl").
|
||||
|
||||
%% APIs
|
||||
-export([ match/2
|
||||
, validate/1
|
||||
|
@ -33,19 +31,23 @@
|
|||
, parse/2
|
||||
]).
|
||||
|
||||
-export_type([ group/0
|
||||
, topic/0
|
||||
, word/0
|
||||
, triple/0
|
||||
]).
|
||||
|
||||
-type(group() :: binary()).
|
||||
-type(topic() :: binary()).
|
||||
-type(word() :: '' | '+' | '#' | binary()).
|
||||
-type(words() :: list(word())).
|
||||
-opaque(triple() :: {root | binary(), word(), binary()}).
|
||||
|
||||
-export_type([group/0, topic/0, word/0, triple/0]).
|
||||
|
||||
-define(MAX_TOPIC_LEN, 4096).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%%--------------------------------------------------------------------
|
||||
%% APIs
|
||||
%%------------------------------------------------------------------------------
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc Is wildcard topic?
|
||||
-spec(wildcard(topic() | words()) -> true | false).
|
||||
|
@ -60,7 +62,7 @@ wildcard(['+'|_]) ->
|
|||
wildcard([_H|T]) ->
|
||||
wildcard(T).
|
||||
|
||||
%% @doc Match Topic name with filter
|
||||
%% @doc Match Topic name with filter.
|
||||
-spec(match(Name, Filter) -> boolean() when
|
||||
Name :: topic() | words(),
|
||||
Filter :: topic() | words()).
|
||||
|
@ -68,7 +70,7 @@ match(<<$$, _/binary>>, <<$+, _/binary>>) ->
|
|||
false;
|
||||
match(<<$$, _/binary>>, <<$#, _/binary>>) ->
|
||||
false;
|
||||
match(Name, Filter) when is_binary(Name) and is_binary(Filter) ->
|
||||
match(Name, Filter) when is_binary(Name), is_binary(Filter) ->
|
||||
match(words(Name), words(Filter));
|
||||
match([], []) ->
|
||||
true;
|
||||
|
@ -95,13 +97,15 @@ validate({Type, Topic}) when Type =:= name; Type =:= filter ->
|
|||
-spec(validate(name | filter, topic()) -> true).
|
||||
validate(_, <<>>) ->
|
||||
error(empty_topic);
|
||||
validate(_, Topic) when is_binary(Topic) and (size(Topic) > ?MAX_TOPIC_LEN) ->
|
||||
validate(_, Topic) when is_binary(Topic) andalso (size(Topic) > ?MAX_TOPIC_LEN) ->
|
||||
error(topic_too_long);
|
||||
validate(filter, Topic) when is_binary(Topic) ->
|
||||
validate2(words(Topic));
|
||||
validate(name, Topic) when is_binary(Topic) ->
|
||||
Words = words(Topic),
|
||||
validate2(Words) and (not wildcard(Words)).
|
||||
validate2(Words)
|
||||
andalso (not wildcard(Words))
|
||||
orelse error(topic_name_error).
|
||||
|
||||
validate2([]) ->
|
||||
true;
|
||||
|
@ -123,7 +127,7 @@ validate3(<<C/utf8, _Rest/binary>>) when C == $#; C == $+; C == 0 ->
|
|||
validate3(<<_/utf8, Rest/binary>>) ->
|
||||
validate3(Rest).
|
||||
|
||||
%% @doc Topic to triples
|
||||
%% @doc Topic to triples.
|
||||
-spec(triples(topic()) -> list(triple())).
|
||||
triples(Topic) when is_binary(Topic) ->
|
||||
triples(words(Topic), root, []).
|
||||
|
@ -206,27 +210,27 @@ join(Words) ->
|
|||
end, {true, <<>>}, [bin(W) || W <- Words]),
|
||||
Bin.
|
||||
|
||||
-spec(parse(topic()) -> {topic(), #{}}).
|
||||
parse(Topic) when is_binary(Topic) ->
|
||||
parse(Topic, #{}).
|
||||
-spec(parse(topic() | {topic(), map()}) -> {topic(), #{share => binary()}}).
|
||||
parse(TopicFilter) when is_binary(TopicFilter) ->
|
||||
parse(TopicFilter, #{});
|
||||
parse({TopicFilter, Options}) when is_binary(TopicFilter) ->
|
||||
parse(TopicFilter, Options).
|
||||
|
||||
parse(Topic = <<"$queue/", _/binary>>, #{share := _Group}) ->
|
||||
error({invalid_topic, Topic});
|
||||
parse(Topic = <<?SHARE, "/", _/binary>>, #{share := _Group}) ->
|
||||
error({invalid_topic, Topic});
|
||||
parse(<<"$queue/", Topic1/binary>>, Options) ->
|
||||
parse(Topic1, maps:put(share, <<"$queue">>, Options));
|
||||
parse(Topic = <<?SHARE, "/", Topic1/binary>>, Options) ->
|
||||
case binary:split(Topic1, <<"/">>) of
|
||||
[<<>>] -> error({invalid_topic, Topic});
|
||||
[_] -> error({invalid_topic, Topic});
|
||||
[Group, Topic2] ->
|
||||
case binary:match(Group, [<<"/">>, <<"+">>, <<"#">>]) of
|
||||
nomatch -> {Topic2, maps:put(share, Group, Options)};
|
||||
_ -> error({invalid_topic, Topic})
|
||||
-spec(parse(topic(), map()) -> {topic(), map()}).
|
||||
parse(TopicFilter = <<"$queue/", _/binary>>, #{share := _Group}) ->
|
||||
error({invalid_topic_filter, TopicFilter});
|
||||
parse(TopicFilter = <<"$share/", _/binary>>, #{share := _Group}) ->
|
||||
error({invalid_topic_filter, TopicFilter});
|
||||
parse(<<"$queue/", TopicFilter/binary>>, Options) ->
|
||||
parse(TopicFilter, Options#{share => <<"$queue">>});
|
||||
parse(TopicFilter = <<"$share/", Rest/binary>>, Options) ->
|
||||
case binary:split(Rest, <<"/">>) of
|
||||
[_Any] -> error({invalid_topic_filter, TopicFilter});
|
||||
[ShareName, Filter] ->
|
||||
case binary:match(ShareName, [<<"+">>, <<"#">>]) of
|
||||
nomatch -> parse(Filter, Options#{share => ShareName});
|
||||
_ -> error({invalid_topic_filter, TopicFilter})
|
||||
end
|
||||
end;
|
||||
parse(Topic, Options = #{qos := QoS}) ->
|
||||
{Topic, Options#{rc => QoS}};
|
||||
parse(Topic, Options) ->
|
||||
{Topic, Options}.
|
||||
parse(TopicFilter, Options) ->
|
||||
{TopicFilter, Options}.
|
Loading…
Reference in New Issue