Merge pull request #2758 from emqx/merge_3.3

Fix unmount crash
This commit is contained in:
turtleDeng 2019-08-12 09:37:15 +08:00 committed by GitHub
commit 58ba22dfc7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 59 additions and 49 deletions

View File

@ -15,9 +15,7 @@
-module(emqx_mountpoint). -module(emqx_mountpoint).
-include("emqx.hrl"). -include("emqx.hrl").
-include("logger.hrl"). -include("types.hrl").
-logger_header("[Mountpoint]").
-export([ mount/2 -export([ mount/2
, unmount/2 , unmount/2
@ -32,30 +30,34 @@
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% APIs %% APIs
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
mount(undefined, Any) -> mount(undefined, Any) ->
Any; Any;
mount(MountPoint, Topic) when is_binary(Topic) ->
prefix(MountPoint, Topic);
mount(MountPoint, Msg = #message{topic = Topic}) -> mount(MountPoint, Msg = #message{topic = Topic}) ->
Msg#message{topic = <<MountPoint/binary, Topic/binary>>}; Msg#message{topic = prefix(MountPoint, Topic)};
mount(MountPoint, TopicFilters) when is_list(TopicFilters) -> mount(MountPoint, TopicFilters) when is_list(TopicFilters) ->
[{<<MountPoint/binary, Topic/binary>>, SubOpts} || {Topic, SubOpts} <- TopicFilters]. [{prefix(MountPoint, Topic), SubOpts} || {Topic, SubOpts} <- TopicFilters].
unmount(undefined, Msg) -> unmount(undefined, Any) ->
Msg; Any;
unmount(MountPoint, Topic) when is_binary(Topic) ->
case string:prefix(Topic, MountPoint) of
nomatch -> Topic;
Topic1 -> Topic1
end;
unmount(MountPoint, Msg = #message{topic = Topic}) -> unmount(MountPoint, Msg = #message{topic = Topic}) ->
try split_binary(Topic, byte_size(MountPoint)) of case string:prefix(Topic, MountPoint) of
{MountPoint, Topic1} -> Msg#message{topic = Topic1} nomatch -> Msg;
catch Topic1 -> Msg#message{topic = Topic1}
_Error:Reason ->
?LOG(error, "Unmount error : ~p", [Reason]),
Msg
end. end.
-spec(replvar(maybe(mountpoint()), map()) -> maybe(mountpoint())).
replvar(undefined, _Vars) -> replvar(undefined, _Vars) ->
undefined; undefined;
replvar(MountPoint, #{client_id := ClientId, username := Username}) -> replvar(MountPoint, #{client_id := ClientId, username := Username}) ->
lists:foldl(fun feed_var/2, MountPoint, [{<<"%c">>, ClientId}, {<<"%u">>, Username}]). lists:foldl(fun feed_var/2, MountPoint,
[{<<"%c">>, ClientId}, {<<"%u">>, Username}]).
feed_var({<<"%c">>, ClientId}, MountPoint) -> feed_var({<<"%c">>, ClientId}, MountPoint) ->
emqx_topic:feed_var(<<"%c">>, ClientId, MountPoint); emqx_topic:feed_var(<<"%c">>, ClientId, MountPoint);
@ -64,3 +66,5 @@ feed_var({<<"%u">>, undefined}, MountPoint) ->
feed_var({<<"%u">>, Username}, MountPoint) -> feed_var({<<"%u">>, Username}, MountPoint) ->
emqx_topic:feed_var(<<"%u">>, Username, MountPoint). emqx_topic:feed_var(<<"%u">>, Username, MountPoint).
prefix(MountPoint, Topic) ->
<<MountPoint/binary, Topic/binary>>.

View File

@ -1027,7 +1027,7 @@ raw_topic_filters(#pstate{zone = Zone, proto_ver = ProtoVer, is_bridge = IsBridg
end. end.
mountpoint(Credentials) -> mountpoint(Credentials) ->
maps:get(mountpoint, Credentials, undefined). emqx_mountpoint:replvar(maps:get(mountpoint, Credentials, undefined), Credentials).
do_check_banned(_EnableBan = true, Credentials) -> do_check_banned(_EnableBan = true, Credentials) ->
case emqx_banned:check(Credentials) of case emqx_banned:check(Credentials) of

View File

@ -14,8 +14,6 @@
-module(emqx_topic). -module(emqx_topic).
-include("emqx_mqtt.hrl").
%% APIs %% APIs
-export([ match/2 -export([ match/2
, validate/1 , validate/1
@ -33,19 +31,23 @@
, parse/2 , parse/2
]). ]).
-export_type([ group/0
, topic/0
, word/0
, triple/0
]).
-type(group() :: binary()). -type(group() :: binary()).
-type(topic() :: binary()). -type(topic() :: binary()).
-type(word() :: '' | '+' | '#' | binary()). -type(word() :: '' | '+' | '#' | binary()).
-type(words() :: list(word())). -type(words() :: list(word())).
-opaque(triple() :: {root | binary(), word(), binary()}). -opaque(triple() :: {root | binary(), word(), binary()}).
-export_type([group/0, topic/0, word/0, triple/0]).
-define(MAX_TOPIC_LEN, 4096). -define(MAX_TOPIC_LEN, 4096).
%%------------------------------------------------------------------------------ %%--------------------------------------------------------------------
%% APIs %% APIs
%%------------------------------------------------------------------------------ %%--------------------------------------------------------------------
%% @doc Is wildcard topic? %% @doc Is wildcard topic?
-spec(wildcard(topic() | words()) -> true | false). -spec(wildcard(topic() | words()) -> true | false).
@ -60,7 +62,7 @@ wildcard(['+'|_]) ->
wildcard([_H|T]) -> wildcard([_H|T]) ->
wildcard(T). wildcard(T).
%% @doc Match Topic name with filter %% @doc Match Topic name with filter.
-spec(match(Name, Filter) -> boolean() when -spec(match(Name, Filter) -> boolean() when
Name :: topic() | words(), Name :: topic() | words(),
Filter :: topic() | words()). Filter :: topic() | words()).
@ -68,7 +70,7 @@ match(<<$$, _/binary>>, <<$+, _/binary>>) ->
false; false;
match(<<$$, _/binary>>, <<$#, _/binary>>) -> match(<<$$, _/binary>>, <<$#, _/binary>>) ->
false; false;
match(Name, Filter) when is_binary(Name) and is_binary(Filter) -> match(Name, Filter) when is_binary(Name), is_binary(Filter) ->
match(words(Name), words(Filter)); match(words(Name), words(Filter));
match([], []) -> match([], []) ->
true; true;
@ -95,13 +97,15 @@ validate({Type, Topic}) when Type =:= name; Type =:= filter ->
-spec(validate(name | filter, topic()) -> true). -spec(validate(name | filter, topic()) -> true).
validate(_, <<>>) -> validate(_, <<>>) ->
error(empty_topic); error(empty_topic);
validate(_, Topic) when is_binary(Topic) and (size(Topic) > ?MAX_TOPIC_LEN) -> validate(_, Topic) when is_binary(Topic) andalso (size(Topic) > ?MAX_TOPIC_LEN) ->
error(topic_too_long); error(topic_too_long);
validate(filter, Topic) when is_binary(Topic) -> validate(filter, Topic) when is_binary(Topic) ->
validate2(words(Topic)); validate2(words(Topic));
validate(name, Topic) when is_binary(Topic) -> validate(name, Topic) when is_binary(Topic) ->
Words = words(Topic), Words = words(Topic),
validate2(Words) and (not wildcard(Words)). validate2(Words)
andalso (not wildcard(Words))
orelse error(topic_name_error).
validate2([]) -> validate2([]) ->
true; true;
@ -123,7 +127,7 @@ validate3(<<C/utf8, _Rest/binary>>) when C == $#; C == $+; C == 0 ->
validate3(<<_/utf8, Rest/binary>>) -> validate3(<<_/utf8, Rest/binary>>) ->
validate3(Rest). validate3(Rest).
%% @doc Topic to triples %% @doc Topic to triples.
-spec(triples(topic()) -> list(triple())). -spec(triples(topic()) -> list(triple())).
triples(Topic) when is_binary(Topic) -> triples(Topic) when is_binary(Topic) ->
triples(words(Topic), root, []). triples(words(Topic), root, []).
@ -206,27 +210,29 @@ join(Words) ->
end, {true, <<>>}, [bin(W) || W <- Words]), end, {true, <<>>}, [bin(W) || W <- Words]),
Bin. Bin.
-spec(parse(topic()) -> {topic(), #{}}). -spec(parse(topic() | {topic(), map()}) -> {topic(), #{share => binary()}}).
parse(Topic) when is_binary(Topic) -> parse(TopicFilter) when is_binary(TopicFilter) ->
parse(Topic, #{}). parse(TopicFilter, #{});
parse({TopicFilter, Options}) when is_binary(TopicFilter) ->
parse(TopicFilter, Options).
parse(Topic = <<"$queue/", _/binary>>, #{share := _Group}) -> -spec(parse(topic(), map()) -> {topic(), map()}).
error({invalid_topic, Topic}); parse(TopicFilter = <<"$queue/", _/binary>>, #{share := _Group}) ->
parse(Topic = <<?SHARE, "/", _/binary>>, #{share := _Group}) -> error({invalid_topic_filter, TopicFilter});
error({invalid_topic, Topic}); parse(TopicFilter = <<"$share/", _/binary>>, #{share := _Group}) ->
parse(<<"$queue/", Topic1/binary>>, Options) -> error({invalid_topic_filter, TopicFilter});
parse(Topic1, maps:put(share, <<"$queue">>, Options)); parse(<<"$queue/", TopicFilter/binary>>, Options) ->
parse(Topic = <<?SHARE, "/", Topic1/binary>>, Options) -> parse(TopicFilter, Options#{share => <<"$queue">>});
case binary:split(Topic1, <<"/">>) of parse(TopicFilter = <<"$share/", Rest/binary>>, Options) ->
[<<>>] -> error({invalid_topic, Topic}); case binary:split(Rest, <<"/">>) of
[_] -> error({invalid_topic, Topic}); [_Any] -> error({invalid_topic_filter, TopicFilter});
[Group, Topic2] -> [ShareName, Filter] ->
case binary:match(Group, [<<"/">>, <<"+">>, <<"#">>]) of case binary:match(ShareName, [<<"+">>, <<"#">>]) of
nomatch -> {Topic2, maps:put(share, Group, Options)}; nomatch -> parse(Filter, Options#{share => ShareName});
_ -> error({invalid_topic, Topic}) _ -> error({invalid_topic_filter, TopicFilter})
end end
end; end;
parse(Topic, Options = #{qos := QoS}) -> parse(TopicFilter, Options = #{qos := QoS}) ->
{Topic, Options#{rc => QoS}}; {TopicFilter, Options#{rc => QoS}};
parse(Topic, Options) -> parse(TopicFilter, Options) ->
{Topic, Options}. {TopicFilter, Options}.