From 04c87c06ccc7d7d79c65b0fd1c1de29c5bf9a780 Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Mon, 19 Jan 2015 17:36:54 +0800 Subject: [PATCH] rewrite topic functions --- apps/emqtt/src/emqtt_broker.erl | 4 + apps/emqtt/src/emqtt_http.erl | 2 +- apps/emqtt/src/emqtt_oldtopic.erl | 19 +++ apps/emqtt/src/emqtt_plugin.erl | 1 + apps/emqtt/src/emqtt_protocol.erl | 10 +- apps/emqtt/src/emqtt_topic.erl | 173 +++++++++++++++----------- apps/emqtt/test/emqtt_topic_tests.erl | 45 ++++++- 7 files changed, 172 insertions(+), 82 deletions(-) create mode 100644 apps/emqtt/src/emqtt_broker.erl create mode 100644 apps/emqtt/src/emqtt_oldtopic.erl create mode 100644 apps/emqtt/src/emqtt_plugin.erl diff --git a/apps/emqtt/src/emqtt_broker.erl b/apps/emqtt/src/emqtt_broker.erl new file mode 100644 index 000000000..00adbb1c6 --- /dev/null +++ b/apps/emqtt/src/emqtt_broker.erl @@ -0,0 +1,4 @@ + +-module(emqtt_broker). + + diff --git a/apps/emqtt/src/emqtt_http.erl b/apps/emqtt/src/emqtt_http.erl index dfb19e430..5bececee1 100644 --- a/apps/emqtt/src/emqtt_http.erl +++ b/apps/emqtt/src/emqtt_http.erl @@ -83,7 +83,7 @@ validate(qos, Qos) -> (Qos >= ?QOS_0) and (Qos =< ?QOS_2); validate(topic, Topic) -> - emqtt_topic:validate({publish, Topic}). + emqtt_topic:validate({name, Topic}). int(S) -> list_to_integer(S). diff --git a/apps/emqtt/src/emqtt_oldtopic.erl b/apps/emqtt/src/emqtt_oldtopic.erl new file mode 100644 index 000000000..87c9eb3e9 --- /dev/null +++ b/apps/emqtt/src/emqtt_oldtopic.erl @@ -0,0 +1,19 @@ +-module(emqtt_oldtopic). + +-export([triples/1]). + +triples(B) when is_binary(B) -> + triples(binary_to_list(B), []). + +triples(S, Acc) -> + triples(string:rchr(S, $/), S, Acc). + +triples(0, S, Acc) -> + [{root, l2b(S), l2b(S)}|Acc]; + +triples(I, S, Acc) -> + S1 = string:substr(S, 1, I-1), + S2 = string:substr(S, I+1), + triples(S1, [{l2b(S1), l2b(S2), l2b(S)}|Acc]). + +l2b(L) -> list_to_binary(L). diff --git a/apps/emqtt/src/emqtt_plugin.erl b/apps/emqtt/src/emqtt_plugin.erl new file mode 100644 index 000000000..21ce710f2 --- /dev/null +++ b/apps/emqtt/src/emqtt_plugin.erl @@ -0,0 +1 @@ +-module(emqtt_plugin). diff --git a/apps/emqtt/src/emqtt_protocol.erl b/apps/emqtt/src/emqtt_protocol.erl index d0857c554..5da781f33 100644 --- a/apps/emqtt/src/emqtt_protocol.erl +++ b/apps/emqtt/src/emqtt_protocol.erl @@ -317,7 +317,7 @@ validate_clientid(#mqtt_packet_connect { proto_ver = Ver, clean_sess = CleanSess validate_packet(#mqtt_packet { header = #mqtt_packet_header { type = ?PUBLISH }, variable = #mqtt_packet_publish{ topic_name = Topic }}) -> - case emqtt_topic:validate({publish, Topic}) of + case emqtt_topic:validate({name, Topic}) of true -> ok; false -> lager:warning("Error publish topic: ~p", [Topic]), {error, badtopic} end; @@ -325,21 +325,21 @@ validate_packet(#mqtt_packet { header = #mqtt_packet_header { type = ?PUBLISH } validate_packet(#mqtt_packet { header = #mqtt_packet_header { type = ?SUBSCRIBE }, variable = #mqtt_packet_subscribe{topic_table = Topics }}) -> - validate_topics(subscribe, Topics); + validate_topics(filter, Topics); validate_packet(#mqtt_packet{ header = #mqtt_packet_header { type = ?UNSUBSCRIBE }, variable = #mqtt_packet_subscribe{ topic_table = Topics }}) -> - validate_topics(unsubscribe, Topics); + validate_topics(filter, Topics); validate_packet(_Packet) -> ok. -validate_topics(Type, []) when Type =:= subscribe orelse Type =:= unsubscribe -> +validate_topics(Type, []) when Type =:= name orelse Type =:= filter -> lager:error("Empty Topics!"), {error, empty_topics}; -validate_topics(Type, Topics) when Type =:= subscribe orelse Type =:= unsubscribe -> +validate_topics(Type, Topics) when Type =:= name orelse Type =:= filter -> ErrTopics = [Topic || #mqtt_topic{name=Topic, qos=Qos} <- Topics, not (emqtt_topic:validate({Type, Topic}) and validate_qos(Qos))], case ErrTopics of diff --git a/apps/emqtt/src/emqtt_topic.erl b/apps/emqtt/src/emqtt_topic.erl index d9ba5d41b..d04ca6aa0 100644 --- a/apps/emqtt/src/emqtt_topic.erl +++ b/apps/emqtt/src/emqtt_topic.erl @@ -26,8 +26,6 @@ -import(lists, [reverse/1]). --import(string, [rchr/2, substr/2, substr/3]). - %% ------------------------------------------------------------------------ %% Topic semantics and usage %% ------------------------------------------------------------------------ @@ -50,49 +48,61 @@ -include("emqtt_topic.hrl"). --export([new/1, - type/1, - match/2, - validate/1, - triples/1, - words/1]). +-export([new/1, type/1, match/2, validate/1, triples/1, words/1]). --define(MAX_LEN, 1024). +%%---------------------------------------------------------------------------- --spec new(Name :: binary()) -> topic(). +-ifdef(use_specs). + +-spec new( binary() ) -> topic(). + +-spec type(topic() | binary()) -> direct | wildcard. + +-spec match(binary(), binary()) -> boolean(). + +-spec validate({name | filter, binary()}) -> boolean(). + +-endif. + +%%---------------------------------------------------------------------------- + +-define(MAX_TOPIC_LEN, 65535). + +%% ------------------------------------------------------------------------ +%% New Topic +%% ------------------------------------------------------------------------ new(Name) when is_binary(Name) -> - #topic{name=Name, node=node()}. + #topic{ name = Name, node = node() }. %% ------------------------------------------------------------------------ -%% topic type: direct or wildcard +%% Topic Type: direct or wildcard %% ------------------------------------------------------------------------ --spec type(Topic :: topic()) -> direct | wildcard. -type(#topic{name=Name}) when is_binary(Name) -> - type(words(Name)); -type([]) -> - direct; -type([<<>>|T]) -> - type(T); -type([<<$#, _/binary>>|_]) -> - wildcard; -type([<<$+, _/binary>>|_]) -> - wildcard; -type([_|T]) -> - type(T). +type(#topic{ name = Name }) when is_binary(Name) -> + type(Name); +type(Topic) when is_binary(Topic) -> + type2(words(Topic)). + +type2([]) -> + direct; +type2(['#'|_]) -> + wildcard; +type2(['+'|_]) -> + wildcard; +type2([_H |T]) -> + type2(T). %% ------------------------------------------------------------------------ -%% topic match +%% Match Topic. B1 is Topic Name, B2 is Topic Filter. %% ------------------------------------------------------------------------ --spec match(B1 :: binary(), B2 :: binary()) -> boolean(). -match(B1, B2) when is_binary(B1) and is_binary(B2) -> - match(words(B1), words(B2)); +match(Name, Filter) when is_binary(Name) and is_binary(Filter) -> + match(words(Name), words(Filter)); match([], []) -> true; match([H|T1], [H|T2]) -> match(T1, T2); -match([_H|T1], [<<"+">>|T2]) -> +match([_H|T1], ['+'|T2]) -> match(T1, T2); -match(_, [<<"#">>]) -> +match(_, ['#']) -> true; match([_H1|_], [_H2|_]) -> false; @@ -100,57 +110,78 @@ match([], [_H|_T2]) -> false. %% ------------------------------------------------------------------------ -%% topic validate +%% Validate Topic %% ------------------------------------------------------------------------ --spec validate({Type :: subscribe | publish, Topic :: binary()}) -> boolean(). validate({_, <<>>}) -> false; -validate({_, Topic}) when is_binary(Topic) and (size(Topic) > ?MAX_LEN) -> +validate({_, Topic}) when is_binary(Topic) and (size(Topic) > ?MAX_TOPIC_LEN) -> false; -validate({subscribe, Topic}) when is_binary(Topic) -> - valid(words(Topic)); -validate({publish, Topic}) when is_binary(Topic) -> +validate({filter, Topic}) when is_binary(Topic) -> + validate2(words(Topic)); +validate({name, Topic}) when is_binary(Topic) -> Words = words(Topic), - valid(Words) and (not include_wildcard(Topic)). + validate2(Words) and (not include_wildcard(Words)). -triples(B) when is_binary(B) -> - triples(binary_to_list(B), []). +validate2([]) -> + true; +validate2(['#']) -> % end with '#' + true; +validate2(['#'|Words]) when length(Words) > 0 -> + false; +validate2([''|Words]) -> + validate2(Words); +validate2(['+'|Words]) -> + validate2(Words); +validate2([W|Words]) -> + case validate3(W) of + true -> validate2(Words); + false -> false + end. -triples(S, Acc) -> - triples(rchr(S, $/), S, Acc). +validate3(<<>>) -> + true; +validate3(<>) when C == $#; C == $+; C == 0 -> + false; +validate3(<<_/utf8, Rest/binary>>) -> + validate3(Rest). -triples(0, S, Acc) -> - [{root, l2b(S), l2b(S)}|Acc]; +include_wildcard([]) -> false; +include_wildcard(['#'|_T]) -> true; +include_wildcard(['+'|_T]) -> true; +include_wildcard([ _ | T]) -> include_wildcard(T). -triples(I, S, Acc) -> - S1 = substr(S, 1, I-1), - S2 = substr(S, I+1), - triples(S1, [{l2b(S1), l2b(S2), l2b(S)}|Acc]). +%% ------------------------------------------------------------------------ +%% Topic to Triples +%% ------------------------------------------------------------------------ +triples(Topic) when is_binary(Topic) -> + triples(words(Topic), root, []). +triples([], _Parent, Acc) -> + reverse(Acc); + +triples([W|Words], Parent, Acc) -> + Node = join(Parent, W), + triples(Words, Node, [{Parent, W, Node}|Acc]). + +join(root, W) -> + W; +join(Parent, W) -> + <<(bin(Parent))/binary, $/, (bin(W))/binary>>. + +bin('') -> <<>>; +bin('+') -> <<"+">>; +bin('#') -> <<"#">>; +bin( B ) when is_binary(B) -> B. + +%% ------------------------------------------------------------------------ +%% Split Topic to Words +%% ------------------------------------------------------------------------ words(Topic) when is_binary(Topic) -> - words(binary_to_list(Topic), [], []). + [word(W) || W <- binary:split(Topic, <<"/">>, [global])]. -words([], Word, ResAcc) -> - reverse([l2b(reverse(W)) || W <- [Word|ResAcc]]); +word(<<>>) -> ''; +word(<<"+">>) -> '+'; +word(<<"#">>) -> '#'; +word(Bin) -> Bin. -words([$/|Topic], Word, ResAcc) -> - words(Topic, [], [Word|ResAcc]); - -words([C|Topic], Word, ResAcc) -> - words(Topic, [C|Word], ResAcc). - -valid([<<>>|Words]) -> valid2(Words); -valid(Words) -> valid2(Words). - -valid2([<<>>|_Words]) -> false; -valid2([<<"#">>|Words]) when length(Words) > 0 -> false; -valid2([_|Words]) -> valid2(Words); -valid2([]) -> true. - -include_wildcard(<<>>) -> false; -include_wildcard(<<$#, _T/binary>>) -> true; -include_wildcard(<<$+, _T/binary>>) -> true; -include_wildcard(<<_H, T/binary>>) -> include_wildcard(T). - -l2b(L) -> list_to_binary(L). diff --git a/apps/emqtt/test/emqtt_topic_tests.erl b/apps/emqtt/test/emqtt_topic_tests.erl index 6c353c4d4..07c19b2f5 100644 --- a/apps/emqtt/test/emqtt_topic_tests.erl +++ b/apps/emqtt/test/emqtt_topic_tests.erl @@ -30,17 +30,52 @@ -include_lib("eunit/include/eunit.hrl"). +-define(N, 100000). + validate_test() -> - ?assert( validate({subscribe, <<"a/b/c">>}) ), - ?assert( validate({subscribe, <<"/a/b">>}) ), - ?assert( validate({subscribe, <<"/+/x">>}) ), - ?assert( validate({subscribe, <<"/a/b/c/#">>}) ), - ?assertNot( validate({subscribe, <<"a/#/c">>}) ). + ?assert( validate({filter, <<"a/b/c">>}) ), + ?assert( validate({filter, <<"/a/b">>}) ), + ?assert( validate({filter, <<"/+/x">>}) ), + ?assert( validate({filter, <<"/a/b/c/#">>}) ), + ?assertNot( validate({filter, <<"a/#/c">>}) ). + +match_test() -> + ?assert( match(<<"a/b/ccc">>, <<"a/#">>) ), + Name = <<"/abkc/19383/192939/akakdkkdkak/xxxyyuya/akakak">>, + Filter = <<"/abkc/19383/+/akakdkkdkak/#">>, + ?assert( match(Name, Filter) ), + ?debugFmt("Match ~p with ~p", [Name, Filter]), + {Time, _} = timer:tc(fun() -> + [match(Name, Filter) || _I <- lists:seq(1, ?N)] + end), + ?debugFmt("Time for match: ~p(micro)", [Time/?N]), + ok. + +triples_test() -> + Topic = <<"/abkc/19383/192939/akakdkkdkak/xxxyyuya/akakak">>, + {Time, _} = timer:tc(fun() -> + [triples(Topic) || _I <- lists:seq(1, ?N)] + end), + ?debugFmt("Time for triples: ~p(micro)", [Time/?N]), + ok. type_test() -> ?assertEqual(direct, type(#topic{name = <<"/a/b/cdkd">>})), ?assertEqual(wildcard, type(#topic{name = <<"/a/+/d">>})), ?assertEqual(wildcard, type(#topic{name = <<"/a/b/#">>})). +words_test() -> + ?debugFmt("Words: ~p", [words(<<"/abkc/19383/+/akakdkkdkak/#">>)]), + ?assertMatch(['', <<"abkc">>, <<"19383">>, '+', <<"akakdkkdkak">>, '#'], words(<<"/abkc/19383/+/akakdkkdkak/#">>)), + {Time, _} = timer:tc(fun() -> + [words(<<"/abkc/19383/+/akakdkkdkak/#">>) || _I <- lists:seq(1, ?N)] + end), + ?debugFmt("Time for words: ~p(micro)", [Time/?N]), + {Time2, _} = timer:tc(fun() -> + [binary:split(<<"/abkc/19383/+/akakdkkdkak/#">>, <<"/">>, [global]) || _I <- lists:seq(1, ?N)] + end), + ?debugFmt("Time for binary:split: ~p(micro)", [Time2/?N]), + ok. + -endif.