rewrite topic functions
This commit is contained in:
parent
0be1ca8cc6
commit
04c87c06cc
|
@ -0,0 +1,4 @@
|
||||||
|
|
||||||
|
-module(emqtt_broker).
|
||||||
|
|
||||||
|
|
|
@ -83,7 +83,7 @@ validate(qos, Qos) ->
|
||||||
(Qos >= ?QOS_0) and (Qos =< ?QOS_2);
|
(Qos >= ?QOS_0) and (Qos =< ?QOS_2);
|
||||||
|
|
||||||
validate(topic, Topic) ->
|
validate(topic, Topic) ->
|
||||||
emqtt_topic:validate({publish, Topic}).
|
emqtt_topic:validate({name, Topic}).
|
||||||
|
|
||||||
int(S) -> list_to_integer(S).
|
int(S) -> list_to_integer(S).
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,19 @@
|
||||||
|
-module(emqtt_oldtopic).
|
||||||
|
|
||||||
|
-export([triples/1]).
|
||||||
|
|
||||||
|
triples(B) when is_binary(B) ->
|
||||||
|
triples(binary_to_list(B), []).
|
||||||
|
|
||||||
|
triples(S, Acc) ->
|
||||||
|
triples(string:rchr(S, $/), S, Acc).
|
||||||
|
|
||||||
|
triples(0, S, Acc) ->
|
||||||
|
[{root, l2b(S), l2b(S)}|Acc];
|
||||||
|
|
||||||
|
triples(I, S, Acc) ->
|
||||||
|
S1 = string:substr(S, 1, I-1),
|
||||||
|
S2 = string:substr(S, I+1),
|
||||||
|
triples(S1, [{l2b(S1), l2b(S2), l2b(S)}|Acc]).
|
||||||
|
|
||||||
|
l2b(L) -> list_to_binary(L).
|
|
@ -0,0 +1 @@
|
||||||
|
-module(emqtt_plugin).
|
|
@ -317,7 +317,7 @@ validate_clientid(#mqtt_packet_connect { proto_ver = Ver, clean_sess = CleanSess
|
||||||
|
|
||||||
validate_packet(#mqtt_packet { header = #mqtt_packet_header { type = ?PUBLISH },
|
validate_packet(#mqtt_packet { header = #mqtt_packet_header { type = ?PUBLISH },
|
||||||
variable = #mqtt_packet_publish{ topic_name = Topic }}) ->
|
variable = #mqtt_packet_publish{ topic_name = Topic }}) ->
|
||||||
case emqtt_topic:validate({publish, Topic}) of
|
case emqtt_topic:validate({name, Topic}) of
|
||||||
true -> ok;
|
true -> ok;
|
||||||
false -> lager:warning("Error publish topic: ~p", [Topic]), {error, badtopic}
|
false -> lager:warning("Error publish topic: ~p", [Topic]), {error, badtopic}
|
||||||
end;
|
end;
|
||||||
|
@ -325,21 +325,21 @@ validate_packet(#mqtt_packet { header = #mqtt_packet_header { type = ?PUBLISH }
|
||||||
validate_packet(#mqtt_packet { header = #mqtt_packet_header { type = ?SUBSCRIBE },
|
validate_packet(#mqtt_packet { header = #mqtt_packet_header { type = ?SUBSCRIBE },
|
||||||
variable = #mqtt_packet_subscribe{topic_table = Topics }}) ->
|
variable = #mqtt_packet_subscribe{topic_table = Topics }}) ->
|
||||||
|
|
||||||
validate_topics(subscribe, Topics);
|
validate_topics(filter, Topics);
|
||||||
|
|
||||||
validate_packet(#mqtt_packet{ header = #mqtt_packet_header { type = ?UNSUBSCRIBE },
|
validate_packet(#mqtt_packet{ header = #mqtt_packet_header { type = ?UNSUBSCRIBE },
|
||||||
variable = #mqtt_packet_subscribe{ topic_table = Topics }}) ->
|
variable = #mqtt_packet_subscribe{ topic_table = Topics }}) ->
|
||||||
|
|
||||||
validate_topics(unsubscribe, Topics);
|
validate_topics(filter, Topics);
|
||||||
|
|
||||||
validate_packet(_Packet) ->
|
validate_packet(_Packet) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
validate_topics(Type, []) when Type =:= subscribe orelse Type =:= unsubscribe ->
|
validate_topics(Type, []) when Type =:= name orelse Type =:= filter ->
|
||||||
lager:error("Empty Topics!"),
|
lager:error("Empty Topics!"),
|
||||||
{error, empty_topics};
|
{error, empty_topics};
|
||||||
|
|
||||||
validate_topics(Type, Topics) when Type =:= subscribe orelse Type =:= unsubscribe ->
|
validate_topics(Type, Topics) when Type =:= name orelse Type =:= filter ->
|
||||||
ErrTopics = [Topic || #mqtt_topic{name=Topic, qos=Qos} <- Topics,
|
ErrTopics = [Topic || #mqtt_topic{name=Topic, qos=Qos} <- Topics,
|
||||||
not (emqtt_topic:validate({Type, Topic}) and validate_qos(Qos))],
|
not (emqtt_topic:validate({Type, Topic}) and validate_qos(Qos))],
|
||||||
case ErrTopics of
|
case ErrTopics of
|
||||||
|
|
|
@ -26,8 +26,6 @@
|
||||||
|
|
||||||
-import(lists, [reverse/1]).
|
-import(lists, [reverse/1]).
|
||||||
|
|
||||||
-import(string, [rchr/2, substr/2, substr/3]).
|
|
||||||
|
|
||||||
%% ------------------------------------------------------------------------
|
%% ------------------------------------------------------------------------
|
||||||
%% Topic semantics and usage
|
%% Topic semantics and usage
|
||||||
%% ------------------------------------------------------------------------
|
%% ------------------------------------------------------------------------
|
||||||
|
@ -50,49 +48,61 @@
|
||||||
|
|
||||||
-include("emqtt_topic.hrl").
|
-include("emqtt_topic.hrl").
|
||||||
|
|
||||||
-export([new/1,
|
-export([new/1, type/1, match/2, validate/1, triples/1, words/1]).
|
||||||
type/1,
|
|
||||||
match/2,
|
|
||||||
validate/1,
|
|
||||||
triples/1,
|
|
||||||
words/1]).
|
|
||||||
|
|
||||||
-define(MAX_LEN, 1024).
|
%%----------------------------------------------------------------------------
|
||||||
|
|
||||||
-spec new(Name :: binary()) -> topic().
|
-ifdef(use_specs).
|
||||||
|
|
||||||
|
-spec new( binary() ) -> topic().
|
||||||
|
|
||||||
|
-spec type(topic() | binary()) -> direct | wildcard.
|
||||||
|
|
||||||
|
-spec match(binary(), binary()) -> boolean().
|
||||||
|
|
||||||
|
-spec validate({name | filter, binary()}) -> boolean().
|
||||||
|
|
||||||
|
-endif.
|
||||||
|
|
||||||
|
%%----------------------------------------------------------------------------
|
||||||
|
|
||||||
|
-define(MAX_TOPIC_LEN, 65535).
|
||||||
|
|
||||||
|
%% ------------------------------------------------------------------------
|
||||||
|
%% New Topic
|
||||||
|
%% ------------------------------------------------------------------------
|
||||||
new(Name) when is_binary(Name) ->
|
new(Name) when is_binary(Name) ->
|
||||||
#topic{name=Name, node=node()}.
|
#topic{ name = Name, node = node() }.
|
||||||
|
|
||||||
%% ------------------------------------------------------------------------
|
%% ------------------------------------------------------------------------
|
||||||
%% topic type: direct or wildcard
|
%% Topic Type: direct or wildcard
|
||||||
%% ------------------------------------------------------------------------
|
%% ------------------------------------------------------------------------
|
||||||
-spec type(Topic :: topic()) -> direct | wildcard.
|
type(#topic{ name = Name }) when is_binary(Name) ->
|
||||||
type(#topic{name=Name}) when is_binary(Name) ->
|
type(Name);
|
||||||
type(words(Name));
|
type(Topic) when is_binary(Topic) ->
|
||||||
type([]) ->
|
type2(words(Topic)).
|
||||||
direct;
|
|
||||||
type([<<>>|T]) ->
|
type2([]) ->
|
||||||
type(T);
|
direct;
|
||||||
type([<<$#, _/binary>>|_]) ->
|
type2(['#'|_]) ->
|
||||||
wildcard;
|
wildcard;
|
||||||
type([<<$+, _/binary>>|_]) ->
|
type2(['+'|_]) ->
|
||||||
wildcard;
|
wildcard;
|
||||||
type([_|T]) ->
|
type2([_H |T]) ->
|
||||||
type(T).
|
type2(T).
|
||||||
|
|
||||||
%% ------------------------------------------------------------------------
|
%% ------------------------------------------------------------------------
|
||||||
%% topic match
|
%% Match Topic. B1 is Topic Name, B2 is Topic Filter.
|
||||||
%% ------------------------------------------------------------------------
|
%% ------------------------------------------------------------------------
|
||||||
-spec match(B1 :: binary(), B2 :: binary()) -> boolean().
|
match(Name, Filter) when is_binary(Name) and is_binary(Filter) ->
|
||||||
match(B1, B2) when is_binary(B1) and is_binary(B2) ->
|
match(words(Name), words(Filter));
|
||||||
match(words(B1), words(B2));
|
|
||||||
match([], []) ->
|
match([], []) ->
|
||||||
true;
|
true;
|
||||||
match([H|T1], [H|T2]) ->
|
match([H|T1], [H|T2]) ->
|
||||||
match(T1, T2);
|
match(T1, T2);
|
||||||
match([_H|T1], [<<"+">>|T2]) ->
|
match([_H|T1], ['+'|T2]) ->
|
||||||
match(T1, T2);
|
match(T1, T2);
|
||||||
match(_, [<<"#">>]) ->
|
match(_, ['#']) ->
|
||||||
true;
|
true;
|
||||||
match([_H1|_], [_H2|_]) ->
|
match([_H1|_], [_H2|_]) ->
|
||||||
false;
|
false;
|
||||||
|
@ -100,57 +110,78 @@ match([], [_H|_T2]) ->
|
||||||
false.
|
false.
|
||||||
|
|
||||||
%% ------------------------------------------------------------------------
|
%% ------------------------------------------------------------------------
|
||||||
%% topic validate
|
%% Validate Topic
|
||||||
%% ------------------------------------------------------------------------
|
%% ------------------------------------------------------------------------
|
||||||
-spec validate({Type :: subscribe | publish, Topic :: binary()}) -> boolean().
|
|
||||||
validate({_, <<>>}) ->
|
validate({_, <<>>}) ->
|
||||||
false;
|
false;
|
||||||
validate({_, Topic}) when is_binary(Topic) and (size(Topic) > ?MAX_LEN) ->
|
validate({_, Topic}) when is_binary(Topic) and (size(Topic) > ?MAX_TOPIC_LEN) ->
|
||||||
false;
|
false;
|
||||||
validate({subscribe, Topic}) when is_binary(Topic) ->
|
validate({filter, Topic}) when is_binary(Topic) ->
|
||||||
valid(words(Topic));
|
validate2(words(Topic));
|
||||||
validate({publish, Topic}) when is_binary(Topic) ->
|
validate({name, Topic}) when is_binary(Topic) ->
|
||||||
Words = words(Topic),
|
Words = words(Topic),
|
||||||
valid(Words) and (not include_wildcard(Topic)).
|
validate2(Words) and (not include_wildcard(Words)).
|
||||||
|
|
||||||
triples(B) when is_binary(B) ->
|
validate2([]) ->
|
||||||
triples(binary_to_list(B), []).
|
true;
|
||||||
|
validate2(['#']) -> % end with '#'
|
||||||
|
true;
|
||||||
|
validate2(['#'|Words]) when length(Words) > 0 ->
|
||||||
|
false;
|
||||||
|
validate2([''|Words]) ->
|
||||||
|
validate2(Words);
|
||||||
|
validate2(['+'|Words]) ->
|
||||||
|
validate2(Words);
|
||||||
|
validate2([W|Words]) ->
|
||||||
|
case validate3(W) of
|
||||||
|
true -> validate2(Words);
|
||||||
|
false -> false
|
||||||
|
end.
|
||||||
|
|
||||||
triples(S, Acc) ->
|
validate3(<<>>) ->
|
||||||
triples(rchr(S, $/), S, Acc).
|
true;
|
||||||
|
validate3(<<C/utf8, _Rest/binary>>) when C == $#; C == $+; C == 0 ->
|
||||||
|
false;
|
||||||
|
validate3(<<_/utf8, Rest/binary>>) ->
|
||||||
|
validate3(Rest).
|
||||||
|
|
||||||
triples(0, S, Acc) ->
|
include_wildcard([]) -> false;
|
||||||
[{root, l2b(S), l2b(S)}|Acc];
|
include_wildcard(['#'|_T]) -> true;
|
||||||
|
include_wildcard(['+'|_T]) -> true;
|
||||||
|
include_wildcard([ _ | T]) -> include_wildcard(T).
|
||||||
|
|
||||||
triples(I, S, Acc) ->
|
%% ------------------------------------------------------------------------
|
||||||
S1 = substr(S, 1, I-1),
|
%% Topic to Triples
|
||||||
S2 = substr(S, I+1),
|
%% ------------------------------------------------------------------------
|
||||||
triples(S1, [{l2b(S1), l2b(S2), l2b(S)}|Acc]).
|
triples(Topic) when is_binary(Topic) ->
|
||||||
|
triples(words(Topic), root, []).
|
||||||
|
|
||||||
|
triples([], _Parent, Acc) ->
|
||||||
|
reverse(Acc);
|
||||||
|
|
||||||
|
triples([W|Words], Parent, Acc) ->
|
||||||
|
Node = join(Parent, W),
|
||||||
|
triples(Words, Node, [{Parent, W, Node}|Acc]).
|
||||||
|
|
||||||
|
join(root, W) ->
|
||||||
|
W;
|
||||||
|
join(Parent, W) ->
|
||||||
|
<<(bin(Parent))/binary, $/, (bin(W))/binary>>.
|
||||||
|
|
||||||
|
bin('') -> <<>>;
|
||||||
|
bin('+') -> <<"+">>;
|
||||||
|
bin('#') -> <<"#">>;
|
||||||
|
bin( B ) when is_binary(B) -> B.
|
||||||
|
|
||||||
|
%% ------------------------------------------------------------------------
|
||||||
|
%% Split Topic to Words
|
||||||
|
%% ------------------------------------------------------------------------
|
||||||
words(Topic) when is_binary(Topic) ->
|
words(Topic) when is_binary(Topic) ->
|
||||||
words(binary_to_list(Topic), [], []).
|
[word(W) || W <- binary:split(Topic, <<"/">>, [global])].
|
||||||
|
|
||||||
words([], Word, ResAcc) ->
|
word(<<>>) -> '';
|
||||||
reverse([l2b(reverse(W)) || W <- [Word|ResAcc]]);
|
word(<<"+">>) -> '+';
|
||||||
|
word(<<"#">>) -> '#';
|
||||||
|
word(Bin) -> Bin.
|
||||||
|
|
||||||
words([$/|Topic], Word, ResAcc) ->
|
|
||||||
words(Topic, [], [Word|ResAcc]);
|
|
||||||
|
|
||||||
words([C|Topic], Word, ResAcc) ->
|
|
||||||
words(Topic, [C|Word], ResAcc).
|
|
||||||
|
|
||||||
valid([<<>>|Words]) -> valid2(Words);
|
|
||||||
valid(Words) -> valid2(Words).
|
|
||||||
|
|
||||||
valid2([<<>>|_Words]) -> false;
|
|
||||||
valid2([<<"#">>|Words]) when length(Words) > 0 -> false;
|
|
||||||
valid2([_|Words]) -> valid2(Words);
|
|
||||||
valid2([]) -> true.
|
|
||||||
|
|
||||||
include_wildcard(<<>>) -> false;
|
|
||||||
include_wildcard(<<$#, _T/binary>>) -> true;
|
|
||||||
include_wildcard(<<$+, _T/binary>>) -> true;
|
|
||||||
include_wildcard(<<_H, T/binary>>) -> include_wildcard(T).
|
|
||||||
|
|
||||||
l2b(L) -> list_to_binary(L).
|
|
||||||
|
|
||||||
|
|
|
@ -30,17 +30,52 @@
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
-define(N, 100000).
|
||||||
|
|
||||||
validate_test() ->
|
validate_test() ->
|
||||||
?assert( validate({subscribe, <<"a/b/c">>}) ),
|
?assert( validate({filter, <<"a/b/c">>}) ),
|
||||||
?assert( validate({subscribe, <<"/a/b">>}) ),
|
?assert( validate({filter, <<"/a/b">>}) ),
|
||||||
?assert( validate({subscribe, <<"/+/x">>}) ),
|
?assert( validate({filter, <<"/+/x">>}) ),
|
||||||
?assert( validate({subscribe, <<"/a/b/c/#">>}) ),
|
?assert( validate({filter, <<"/a/b/c/#">>}) ),
|
||||||
?assertNot( validate({subscribe, <<"a/#/c">>}) ).
|
?assertNot( validate({filter, <<"a/#/c">>}) ).
|
||||||
|
|
||||||
|
match_test() ->
|
||||||
|
?assert( match(<<"a/b/ccc">>, <<"a/#">>) ),
|
||||||
|
Name = <<"/abkc/19383/192939/akakdkkdkak/xxxyyuya/akakak">>,
|
||||||
|
Filter = <<"/abkc/19383/+/akakdkkdkak/#">>,
|
||||||
|
?assert( match(Name, Filter) ),
|
||||||
|
?debugFmt("Match ~p with ~p", [Name, Filter]),
|
||||||
|
{Time, _} = timer:tc(fun() ->
|
||||||
|
[match(Name, Filter) || _I <- lists:seq(1, ?N)]
|
||||||
|
end),
|
||||||
|
?debugFmt("Time for match: ~p(micro)", [Time/?N]),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
triples_test() ->
|
||||||
|
Topic = <<"/abkc/19383/192939/akakdkkdkak/xxxyyuya/akakak">>,
|
||||||
|
{Time, _} = timer:tc(fun() ->
|
||||||
|
[triples(Topic) || _I <- lists:seq(1, ?N)]
|
||||||
|
end),
|
||||||
|
?debugFmt("Time for triples: ~p(micro)", [Time/?N]),
|
||||||
|
ok.
|
||||||
|
|
||||||
type_test() ->
|
type_test() ->
|
||||||
?assertEqual(direct, type(#topic{name = <<"/a/b/cdkd">>})),
|
?assertEqual(direct, type(#topic{name = <<"/a/b/cdkd">>})),
|
||||||
?assertEqual(wildcard, type(#topic{name = <<"/a/+/d">>})),
|
?assertEqual(wildcard, type(#topic{name = <<"/a/+/d">>})),
|
||||||
?assertEqual(wildcard, type(#topic{name = <<"/a/b/#">>})).
|
?assertEqual(wildcard, type(#topic{name = <<"/a/b/#">>})).
|
||||||
|
|
||||||
|
words_test() ->
|
||||||
|
?debugFmt("Words: ~p", [words(<<"/abkc/19383/+/akakdkkdkak/#">>)]),
|
||||||
|
?assertMatch(['', <<"abkc">>, <<"19383">>, '+', <<"akakdkkdkak">>, '#'], words(<<"/abkc/19383/+/akakdkkdkak/#">>)),
|
||||||
|
{Time, _} = timer:tc(fun() ->
|
||||||
|
[words(<<"/abkc/19383/+/akakdkkdkak/#">>) || _I <- lists:seq(1, ?N)]
|
||||||
|
end),
|
||||||
|
?debugFmt("Time for words: ~p(micro)", [Time/?N]),
|
||||||
|
{Time2, _} = timer:tc(fun() ->
|
||||||
|
[binary:split(<<"/abkc/19383/+/akakdkkdkak/#">>, <<"/">>, [global]) || _I <- lists:seq(1, ?N)]
|
||||||
|
end),
|
||||||
|
?debugFmt("Time for binary:split: ~p(micro)", [Time2/?N]),
|
||||||
|
ok.
|
||||||
|
|
||||||
-endif.
|
-endif.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue