From 82a4312562e322464be3381c5b77cf02dff75a5b Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Sun, 18 Jan 2015 23:45:15 +0800 Subject: [PATCH 1/8] v3.1 --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 8ea92ab3f..2c493838c 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # eMQTT -eMQTT is a clusterable, scalable, fault-tolerant and extensible MQTT V3.1.1 broker written in Erlang/OTP. +eMQTT is a clusterable, scalable, fault-tolerant and extensible MQTT V3.1/V3.1.1 broker written in Erlang/OTP. eMQTT support MQTT V3.1/V3.1.1 Protocol Specification. From 0be1ca8cc68b08214c2540e3164aa2d121671d76 Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Mon, 19 Jan 2015 00:18:08 +0800 Subject: [PATCH 2/8] massively --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 2c493838c..da05c6da2 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # eMQTT -eMQTT is a clusterable, scalable, fault-tolerant and extensible MQTT V3.1/V3.1.1 broker written in Erlang/OTP. +eMQTT is a clusterable, massively scalable, fault-tolerant and extensible MQTT V3.1/V3.1.1 broker written in Erlang/OTP. eMQTT support MQTT V3.1/V3.1.1 Protocol Specification. From 04c87c06ccc7d7d79c65b0fd1c1de29c5bf9a780 Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Mon, 19 Jan 2015 17:36:54 +0800 Subject: [PATCH 3/8] rewrite topic functions --- apps/emqtt/src/emqtt_broker.erl | 4 + apps/emqtt/src/emqtt_http.erl | 2 +- apps/emqtt/src/emqtt_oldtopic.erl | 19 +++ apps/emqtt/src/emqtt_plugin.erl | 1 + apps/emqtt/src/emqtt_protocol.erl | 10 +- apps/emqtt/src/emqtt_topic.erl | 173 +++++++++++++++----------- apps/emqtt/test/emqtt_topic_tests.erl | 45 ++++++- 7 files changed, 172 insertions(+), 82 deletions(-) create mode 100644 apps/emqtt/src/emqtt_broker.erl create mode 100644 apps/emqtt/src/emqtt_oldtopic.erl create mode 100644 apps/emqtt/src/emqtt_plugin.erl diff --git a/apps/emqtt/src/emqtt_broker.erl b/apps/emqtt/src/emqtt_broker.erl new file mode 100644 index 000000000..00adbb1c6 --- /dev/null +++ b/apps/emqtt/src/emqtt_broker.erl @@ -0,0 +1,4 @@ + +-module(emqtt_broker). + + diff --git a/apps/emqtt/src/emqtt_http.erl b/apps/emqtt/src/emqtt_http.erl index dfb19e430..5bececee1 100644 --- a/apps/emqtt/src/emqtt_http.erl +++ b/apps/emqtt/src/emqtt_http.erl @@ -83,7 +83,7 @@ validate(qos, Qos) -> (Qos >= ?QOS_0) and (Qos =< ?QOS_2); validate(topic, Topic) -> - emqtt_topic:validate({publish, Topic}). + emqtt_topic:validate({name, Topic}). int(S) -> list_to_integer(S). diff --git a/apps/emqtt/src/emqtt_oldtopic.erl b/apps/emqtt/src/emqtt_oldtopic.erl new file mode 100644 index 000000000..87c9eb3e9 --- /dev/null +++ b/apps/emqtt/src/emqtt_oldtopic.erl @@ -0,0 +1,19 @@ +-module(emqtt_oldtopic). + +-export([triples/1]). + +triples(B) when is_binary(B) -> + triples(binary_to_list(B), []). + +triples(S, Acc) -> + triples(string:rchr(S, $/), S, Acc). + +triples(0, S, Acc) -> + [{root, l2b(S), l2b(S)}|Acc]; + +triples(I, S, Acc) -> + S1 = string:substr(S, 1, I-1), + S2 = string:substr(S, I+1), + triples(S1, [{l2b(S1), l2b(S2), l2b(S)}|Acc]). + +l2b(L) -> list_to_binary(L). diff --git a/apps/emqtt/src/emqtt_plugin.erl b/apps/emqtt/src/emqtt_plugin.erl new file mode 100644 index 000000000..21ce710f2 --- /dev/null +++ b/apps/emqtt/src/emqtt_plugin.erl @@ -0,0 +1 @@ +-module(emqtt_plugin). diff --git a/apps/emqtt/src/emqtt_protocol.erl b/apps/emqtt/src/emqtt_protocol.erl index d0857c554..5da781f33 100644 --- a/apps/emqtt/src/emqtt_protocol.erl +++ b/apps/emqtt/src/emqtt_protocol.erl @@ -317,7 +317,7 @@ validate_clientid(#mqtt_packet_connect { proto_ver = Ver, clean_sess = CleanSess validate_packet(#mqtt_packet { header = #mqtt_packet_header { type = ?PUBLISH }, variable = #mqtt_packet_publish{ topic_name = Topic }}) -> - case emqtt_topic:validate({publish, Topic}) of + case emqtt_topic:validate({name, Topic}) of true -> ok; false -> lager:warning("Error publish topic: ~p", [Topic]), {error, badtopic} end; @@ -325,21 +325,21 @@ validate_packet(#mqtt_packet { header = #mqtt_packet_header { type = ?PUBLISH } validate_packet(#mqtt_packet { header = #mqtt_packet_header { type = ?SUBSCRIBE }, variable = #mqtt_packet_subscribe{topic_table = Topics }}) -> - validate_topics(subscribe, Topics); + validate_topics(filter, Topics); validate_packet(#mqtt_packet{ header = #mqtt_packet_header { type = ?UNSUBSCRIBE }, variable = #mqtt_packet_subscribe{ topic_table = Topics }}) -> - validate_topics(unsubscribe, Topics); + validate_topics(filter, Topics); validate_packet(_Packet) -> ok. -validate_topics(Type, []) when Type =:= subscribe orelse Type =:= unsubscribe -> +validate_topics(Type, []) when Type =:= name orelse Type =:= filter -> lager:error("Empty Topics!"), {error, empty_topics}; -validate_topics(Type, Topics) when Type =:= subscribe orelse Type =:= unsubscribe -> +validate_topics(Type, Topics) when Type =:= name orelse Type =:= filter -> ErrTopics = [Topic || #mqtt_topic{name=Topic, qos=Qos} <- Topics, not (emqtt_topic:validate({Type, Topic}) and validate_qos(Qos))], case ErrTopics of diff --git a/apps/emqtt/src/emqtt_topic.erl b/apps/emqtt/src/emqtt_topic.erl index d9ba5d41b..d04ca6aa0 100644 --- a/apps/emqtt/src/emqtt_topic.erl +++ b/apps/emqtt/src/emqtt_topic.erl @@ -26,8 +26,6 @@ -import(lists, [reverse/1]). --import(string, [rchr/2, substr/2, substr/3]). - %% ------------------------------------------------------------------------ %% Topic semantics and usage %% ------------------------------------------------------------------------ @@ -50,49 +48,61 @@ -include("emqtt_topic.hrl"). --export([new/1, - type/1, - match/2, - validate/1, - triples/1, - words/1]). +-export([new/1, type/1, match/2, validate/1, triples/1, words/1]). --define(MAX_LEN, 1024). +%%---------------------------------------------------------------------------- --spec new(Name :: binary()) -> topic(). +-ifdef(use_specs). + +-spec new( binary() ) -> topic(). + +-spec type(topic() | binary()) -> direct | wildcard. + +-spec match(binary(), binary()) -> boolean(). + +-spec validate({name | filter, binary()}) -> boolean(). + +-endif. + +%%---------------------------------------------------------------------------- + +-define(MAX_TOPIC_LEN, 65535). + +%% ------------------------------------------------------------------------ +%% New Topic +%% ------------------------------------------------------------------------ new(Name) when is_binary(Name) -> - #topic{name=Name, node=node()}. + #topic{ name = Name, node = node() }. %% ------------------------------------------------------------------------ -%% topic type: direct or wildcard +%% Topic Type: direct or wildcard %% ------------------------------------------------------------------------ --spec type(Topic :: topic()) -> direct | wildcard. -type(#topic{name=Name}) when is_binary(Name) -> - type(words(Name)); -type([]) -> - direct; -type([<<>>|T]) -> - type(T); -type([<<$#, _/binary>>|_]) -> - wildcard; -type([<<$+, _/binary>>|_]) -> - wildcard; -type([_|T]) -> - type(T). +type(#topic{ name = Name }) when is_binary(Name) -> + type(Name); +type(Topic) when is_binary(Topic) -> + type2(words(Topic)). + +type2([]) -> + direct; +type2(['#'|_]) -> + wildcard; +type2(['+'|_]) -> + wildcard; +type2([_H |T]) -> + type2(T). %% ------------------------------------------------------------------------ -%% topic match +%% Match Topic. B1 is Topic Name, B2 is Topic Filter. %% ------------------------------------------------------------------------ --spec match(B1 :: binary(), B2 :: binary()) -> boolean(). -match(B1, B2) when is_binary(B1) and is_binary(B2) -> - match(words(B1), words(B2)); +match(Name, Filter) when is_binary(Name) and is_binary(Filter) -> + match(words(Name), words(Filter)); match([], []) -> true; match([H|T1], [H|T2]) -> match(T1, T2); -match([_H|T1], [<<"+">>|T2]) -> +match([_H|T1], ['+'|T2]) -> match(T1, T2); -match(_, [<<"#">>]) -> +match(_, ['#']) -> true; match([_H1|_], [_H2|_]) -> false; @@ -100,57 +110,78 @@ match([], [_H|_T2]) -> false. %% ------------------------------------------------------------------------ -%% topic validate +%% Validate Topic %% ------------------------------------------------------------------------ --spec validate({Type :: subscribe | publish, Topic :: binary()}) -> boolean(). validate({_, <<>>}) -> false; -validate({_, Topic}) when is_binary(Topic) and (size(Topic) > ?MAX_LEN) -> +validate({_, Topic}) when is_binary(Topic) and (size(Topic) > ?MAX_TOPIC_LEN) -> false; -validate({subscribe, Topic}) when is_binary(Topic) -> - valid(words(Topic)); -validate({publish, Topic}) when is_binary(Topic) -> +validate({filter, Topic}) when is_binary(Topic) -> + validate2(words(Topic)); +validate({name, Topic}) when is_binary(Topic) -> Words = words(Topic), - valid(Words) and (not include_wildcard(Topic)). + validate2(Words) and (not include_wildcard(Words)). -triples(B) when is_binary(B) -> - triples(binary_to_list(B), []). +validate2([]) -> + true; +validate2(['#']) -> % end with '#' + true; +validate2(['#'|Words]) when length(Words) > 0 -> + false; +validate2([''|Words]) -> + validate2(Words); +validate2(['+'|Words]) -> + validate2(Words); +validate2([W|Words]) -> + case validate3(W) of + true -> validate2(Words); + false -> false + end. -triples(S, Acc) -> - triples(rchr(S, $/), S, Acc). +validate3(<<>>) -> + true; +validate3(<>) when C == $#; C == $+; C == 0 -> + false; +validate3(<<_/utf8, Rest/binary>>) -> + validate3(Rest). -triples(0, S, Acc) -> - [{root, l2b(S), l2b(S)}|Acc]; +include_wildcard([]) -> false; +include_wildcard(['#'|_T]) -> true; +include_wildcard(['+'|_T]) -> true; +include_wildcard([ _ | T]) -> include_wildcard(T). -triples(I, S, Acc) -> - S1 = substr(S, 1, I-1), - S2 = substr(S, I+1), - triples(S1, [{l2b(S1), l2b(S2), l2b(S)}|Acc]). +%% ------------------------------------------------------------------------ +%% Topic to Triples +%% ------------------------------------------------------------------------ +triples(Topic) when is_binary(Topic) -> + triples(words(Topic), root, []). +triples([], _Parent, Acc) -> + reverse(Acc); + +triples([W|Words], Parent, Acc) -> + Node = join(Parent, W), + triples(Words, Node, [{Parent, W, Node}|Acc]). + +join(root, W) -> + W; +join(Parent, W) -> + <<(bin(Parent))/binary, $/, (bin(W))/binary>>. + +bin('') -> <<>>; +bin('+') -> <<"+">>; +bin('#') -> <<"#">>; +bin( B ) when is_binary(B) -> B. + +%% ------------------------------------------------------------------------ +%% Split Topic to Words +%% ------------------------------------------------------------------------ words(Topic) when is_binary(Topic) -> - words(binary_to_list(Topic), [], []). + [word(W) || W <- binary:split(Topic, <<"/">>, [global])]. -words([], Word, ResAcc) -> - reverse([l2b(reverse(W)) || W <- [Word|ResAcc]]); +word(<<>>) -> ''; +word(<<"+">>) -> '+'; +word(<<"#">>) -> '#'; +word(Bin) -> Bin. -words([$/|Topic], Word, ResAcc) -> - words(Topic, [], [Word|ResAcc]); - -words([C|Topic], Word, ResAcc) -> - words(Topic, [C|Word], ResAcc). - -valid([<<>>|Words]) -> valid2(Words); -valid(Words) -> valid2(Words). - -valid2([<<>>|_Words]) -> false; -valid2([<<"#">>|Words]) when length(Words) > 0 -> false; -valid2([_|Words]) -> valid2(Words); -valid2([]) -> true. - -include_wildcard(<<>>) -> false; -include_wildcard(<<$#, _T/binary>>) -> true; -include_wildcard(<<$+, _T/binary>>) -> true; -include_wildcard(<<_H, T/binary>>) -> include_wildcard(T). - -l2b(L) -> list_to_binary(L). diff --git a/apps/emqtt/test/emqtt_topic_tests.erl b/apps/emqtt/test/emqtt_topic_tests.erl index 6c353c4d4..07c19b2f5 100644 --- a/apps/emqtt/test/emqtt_topic_tests.erl +++ b/apps/emqtt/test/emqtt_topic_tests.erl @@ -30,17 +30,52 @@ -include_lib("eunit/include/eunit.hrl"). +-define(N, 100000). + validate_test() -> - ?assert( validate({subscribe, <<"a/b/c">>}) ), - ?assert( validate({subscribe, <<"/a/b">>}) ), - ?assert( validate({subscribe, <<"/+/x">>}) ), - ?assert( validate({subscribe, <<"/a/b/c/#">>}) ), - ?assertNot( validate({subscribe, <<"a/#/c">>}) ). + ?assert( validate({filter, <<"a/b/c">>}) ), + ?assert( validate({filter, <<"/a/b">>}) ), + ?assert( validate({filter, <<"/+/x">>}) ), + ?assert( validate({filter, <<"/a/b/c/#">>}) ), + ?assertNot( validate({filter, <<"a/#/c">>}) ). + +match_test() -> + ?assert( match(<<"a/b/ccc">>, <<"a/#">>) ), + Name = <<"/abkc/19383/192939/akakdkkdkak/xxxyyuya/akakak">>, + Filter = <<"/abkc/19383/+/akakdkkdkak/#">>, + ?assert( match(Name, Filter) ), + ?debugFmt("Match ~p with ~p", [Name, Filter]), + {Time, _} = timer:tc(fun() -> + [match(Name, Filter) || _I <- lists:seq(1, ?N)] + end), + ?debugFmt("Time for match: ~p(micro)", [Time/?N]), + ok. + +triples_test() -> + Topic = <<"/abkc/19383/192939/akakdkkdkak/xxxyyuya/akakak">>, + {Time, _} = timer:tc(fun() -> + [triples(Topic) || _I <- lists:seq(1, ?N)] + end), + ?debugFmt("Time for triples: ~p(micro)", [Time/?N]), + ok. type_test() -> ?assertEqual(direct, type(#topic{name = <<"/a/b/cdkd">>})), ?assertEqual(wildcard, type(#topic{name = <<"/a/+/d">>})), ?assertEqual(wildcard, type(#topic{name = <<"/a/b/#">>})). +words_test() -> + ?debugFmt("Words: ~p", [words(<<"/abkc/19383/+/akakdkkdkak/#">>)]), + ?assertMatch(['', <<"abkc">>, <<"19383">>, '+', <<"akakdkkdkak">>, '#'], words(<<"/abkc/19383/+/akakdkkdkak/#">>)), + {Time, _} = timer:tc(fun() -> + [words(<<"/abkc/19383/+/akakdkkdkak/#">>) || _I <- lists:seq(1, ?N)] + end), + ?debugFmt("Time for words: ~p(micro)", [Time/?N]), + {Time2, _} = timer:tc(fun() -> + [binary:split(<<"/abkc/19383/+/akakdkkdkak/#">>, <<"/">>, [global]) || _I <- lists:seq(1, ?N)] + end), + ?debugFmt("Time for binary:split: ~p(micro)", [Time2/?N]), + ok. + -endif. From c83d6d0e01000fac42b3fa98c3b4b0c7052bd797 Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Mon, 19 Jan 2015 21:27:47 +0800 Subject: [PATCH 4/8] topic tests --- apps/emqtt/src/emqtt_oldtopic.erl | 11 ++++++- apps/emqtt/src/emqtt_topic.erl | 6 ++++ apps/emqtt/test/emqtt_topic_tests.erl | 42 +++++++++++++++++++++++++-- 3 files changed, 55 insertions(+), 4 deletions(-) diff --git a/apps/emqtt/src/emqtt_oldtopic.erl b/apps/emqtt/src/emqtt_oldtopic.erl index 87c9eb3e9..1ad289df8 100644 --- a/apps/emqtt/src/emqtt_oldtopic.erl +++ b/apps/emqtt/src/emqtt_oldtopic.erl @@ -1,6 +1,6 @@ -module(emqtt_oldtopic). --export([triples/1]). +-export([triples/1, test/0]). triples(B) when is_binary(B) -> triples(binary_to_list(B), []). @@ -17,3 +17,12 @@ triples(I, S, Acc) -> triples(S1, [{l2b(S1), l2b(S2), l2b(S)}|Acc]). l2b(L) -> list_to_binary(L). + +test() -> + N = 100000, + Topic = <<"/abkc/19383/192939/akakdkkdkak/xxxyyuya/akakak">>, + {Time, _} = timer:tc(fun() -> + [triples(Topic) || _I <- lists:seq(1, N)] + end), + io:format("Time for triples: ~p(micro)", [Time/N]), + ok. diff --git a/apps/emqtt/src/emqtt_topic.erl b/apps/emqtt/src/emqtt_topic.erl index d04ca6aa0..f29501e63 100644 --- a/apps/emqtt/src/emqtt_topic.erl +++ b/apps/emqtt/src/emqtt_topic.erl @@ -100,12 +100,18 @@ match([], []) -> true; match([H|T1], [H|T2]) -> match(T1, T2); +match([<<$$, _/binary>>|_], ['+'|_]) -> + false; match([_H|T1], ['+'|T2]) -> match(T1, T2); +match([<<$$, _/binary>>|_], ['#']) -> + false; match(_, ['#']) -> true; match([_H1|_], [_H2|_]) -> false; +match([_H1|_], []) -> + false; match([], [_H|_T2]) -> false. diff --git a/apps/emqtt/test/emqtt_topic_tests.erl b/apps/emqtt/test/emqtt_topic_tests.erl index 07c19b2f5..2f0a85d5b 100644 --- a/apps/emqtt/test/emqtt_topic_tests.erl +++ b/apps/emqtt/test/emqtt_topic_tests.erl @@ -33,18 +33,51 @@ -define(N, 100000). validate_test() -> + ?assert( validate({filter, <<"sport/tennis/#">>}) ), ?assert( validate({filter, <<"a/b/c">>}) ), ?assert( validate({filter, <<"/a/b">>}) ), ?assert( validate({filter, <<"/+/x">>}) ), ?assert( validate({filter, <<"/a/b/c/#">>}) ), - ?assertNot( validate({filter, <<"a/#/c">>}) ). + ?assertNot( validate({filter, <<"a/#/c">>}) ), + ?assertNot( validate({filter, <<"sport/tennis#">>}) ), + ?assertNot( validate({filter, <<"sport/tennis/#/ranking">>}) ). + +sigle_level_validate_test() -> + ?assert( validate({filter, <<"+">>}) ), + ?assert( validate({filter, <<"+/tennis/#">>}) ), + ?assertNot( validate({filter, <<"sport+">>}) ), + ?assert( validate({filter, <<"sport/+/player1">>}) ). match_test() -> + ?assert( match(<<"sport/tennis/player1">>, <<"sport/tennis/player1/#">>) ), + ?assert( match(<<"sport/tennis/player1/ranking">>, <<"sport/tennis/player1/#">>) ), + ?assert( match(<<"sport/tennis/player1/score/wimbledon">>, <<"sport/tennis/player1/#">>) ), + + ?assert( match(<<"sport">>, <<"sport/#">>) ), + ?assert( match(<<"sport">>, <<"#">>) ), + ?assert( match(<<"/sport/football/score/1">>, <<"#">>) ). + +sigle_level_match_test() -> + ?assert( match(<<"sport/tennis/player1">>, <<"sport/tennis/+">>) ), + ?assertNot( match(<<"sport/tennis/player1/ranking">>, <<"sport/tennis/+">>) ), + ?assertNot( match(<<"sport">>, <<"sport/+">>) ), + ?assert( match(<<"sport/">>, <<"sport/+">>) ), + ?assert( match(<<"/finance">>, <<"+/+">>) ), + ?assert( match(<<"/finance">>, <<"/+">>) ), + ?assertNot( match(<<"/finance">>, <<"+">>) ). + +sys_match_test() -> + ?assert( match(<<"$SYS/borker/clients/testclient">>, <<"$SYS/#">>) ), + ?assert( match(<<"$SYS/borker">>, <<"$SYS/+">>) ), + ?assertNot( match(<<"$SYS/borker">>, <<"+/+">>) ), + ?assertNot( match(<<"$SYS/borker">>, <<"#">>) ). + +match_perf_test() -> ?assert( match(<<"a/b/ccc">>, <<"a/#">>) ), Name = <<"/abkc/19383/192939/akakdkkdkak/xxxyyuya/akakak">>, Filter = <<"/abkc/19383/+/akakdkkdkak/#">>, ?assert( match(Name, Filter) ), - ?debugFmt("Match ~p with ~p", [Name, Filter]), + %?debugFmt("Match ~p with ~p", [Name, Filter]), {Time, _} = timer:tc(fun() -> [match(Name, Filter) || _I <- lists:seq(1, ?N)] end), @@ -52,6 +85,10 @@ match_test() -> ok. triples_test() -> + Triples = [{root, <<"a">>, <<"a">>}, {<<"a">>, <<"b">>, <<"a/b">>}], + ?assertMatch(Triples, triples(<<"a/b">>) ). + +triples_perf_test() -> Topic = <<"/abkc/19383/192939/akakdkkdkak/xxxyyuya/akakak">>, {Time, _} = timer:tc(fun() -> [triples(Topic) || _I <- lists:seq(1, ?N)] @@ -65,7 +102,6 @@ type_test() -> ?assertEqual(wildcard, type(#topic{name = <<"/a/b/#">>})). words_test() -> - ?debugFmt("Words: ~p", [words(<<"/abkc/19383/+/akakdkkdkak/#">>)]), ?assertMatch(['', <<"abkc">>, <<"19383">>, '+', <<"akakdkkdkak">>, '#'], words(<<"/abkc/19383/+/akakdkkdkak/#">>)), {Time, _} = timer:tc(fun() -> [words(<<"/abkc/19383/+/akakdkkdkak/#">>) || _I <- lists:seq(1, ?N)] From 7d5f49797937b19d3a99838c2c5b09719a151998 Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Mon, 19 Jan 2015 21:38:11 +0800 Subject: [PATCH 5/8] performance --- doc/pubsub.md | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/doc/pubsub.md b/doc/pubsub.md index 298043b62..8978c29e3 100644 --- a/doc/pubsub.md +++ b/doc/pubsub.md @@ -17,3 +17,23 @@ PubQos | SubQos | In Message | Out Message ## Publish +## Performance + +Mac Air(11): + +Function | Time(microseconds) +-------------|-------------------- +match | 6.25086 +triples | 13.86881 +words | 3.41177 +binary:split | 3.03776 + +iMac: + +Function | Time(microseconds) +-------------|-------------------- +match | 3.2348 +triples | 6.93524 +words | 1.89616 +binary:split | 1.65243 + From 537e18b3764c43f6addd7f2a3691d9b624713bb5 Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Mon, 19 Jan 2015 21:51:36 +0800 Subject: [PATCH 6/8] rm oldtopic --- apps/emqtt/src/emqtt_oldtopic.erl | 28 ---------------------------- 1 file changed, 28 deletions(-) delete mode 100644 apps/emqtt/src/emqtt_oldtopic.erl diff --git a/apps/emqtt/src/emqtt_oldtopic.erl b/apps/emqtt/src/emqtt_oldtopic.erl deleted file mode 100644 index 1ad289df8..000000000 --- a/apps/emqtt/src/emqtt_oldtopic.erl +++ /dev/null @@ -1,28 +0,0 @@ --module(emqtt_oldtopic). - --export([triples/1, test/0]). - -triples(B) when is_binary(B) -> - triples(binary_to_list(B), []). - -triples(S, Acc) -> - triples(string:rchr(S, $/), S, Acc). - -triples(0, S, Acc) -> - [{root, l2b(S), l2b(S)}|Acc]; - -triples(I, S, Acc) -> - S1 = string:substr(S, 1, I-1), - S2 = string:substr(S, I+1), - triples(S1, [{l2b(S1), l2b(S2), l2b(S)}|Acc]). - -l2b(L) -> list_to_binary(L). - -test() -> - N = 100000, - Topic = <<"/abkc/19383/192939/akakdkkdkak/xxxyyuya/akakak">>, - {Time, _} = timer:tc(fun() -> - [triples(Topic) || _I <- lists:seq(1, N)] - end), - io:format("Time for triples: ~p(micro)", [Time/N]), - ok. From 41471da2e80c4f76aa988f24d97a79092a9af31b Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Mon, 19 Jan 2015 22:08:53 +0800 Subject: [PATCH 7/8] 0.3.1 fix topic match --- CHANGELOG.md | 9 +++++++++ apps/emqtt/include/emqtt_topic.hrl | 26 +++++++++++++------------- apps/emqtt/src/emqtt_pubsub.erl | 4 ++-- 3 files changed, 24 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 54cf67e1c..e7671ab7d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,4 @@ + eMQTT ChangeLog ================== @@ -6,6 +7,14 @@ v0.3.1-beta (2015-01-24) Feature: HTTP POST API to support 'qos', 'retain' parameters +Feature: $SYS system topics support + +Change: Rewrite emqtt_topic.erl, use '', '#', '+' to replace <<"">>, <<"#">>, <<"+">> + +Change: fix emqtt_pubsub.erl to match '#', '+' + +Tests: emqtt_topic_tests.erl add more test cases + v0.3.0-alpha (2015-01-18) ------------------------ diff --git a/apps/emqtt/include/emqtt_topic.hrl b/apps/emqtt/include/emqtt_topic.hrl index 4cb9e4dda..5bd171d37 100644 --- a/apps/emqtt/include/emqtt_topic.hrl +++ b/apps/emqtt/include/emqtt_topic.hrl @@ -1,5 +1,5 @@ %%----------------------------------------------------------------------------- -%% Copyright (c) 2015, Feng Lee +%% Copyright (c) 2012-2015, Feng Lee %% %% Permission is hereby granted, free of charge, to any person obtaining a copy %% of this software and associated documentation files (the "Software"), to deal @@ -24,32 +24,32 @@ %% Core PubSub Topic %%------------------------------------------------------------------------------ -record(topic, { - name :: binary(), - node :: node() + name :: binary(), + node :: node() }). -type topic() :: #topic{}. -record(topic_subscriber, { - topic :: binary(), - qos = 0 :: non_neg_integer(), - subpid :: pid() + topic :: binary(), + qos = 0 :: non_neg_integer(), + subpid :: pid() }). -record(topic_trie_node, { - node_id :: binary(), - edge_count = 0 :: non_neg_integer(), - topic :: binary() + node_id :: binary() | atom(), + edge_count = 0 :: non_neg_integer(), + topic :: binary() }). -record(topic_trie_edge, { - node_id :: binary(), - word :: binary() | char() + node_id :: binary() | atom(), + word :: binary() | atom() }). -record(topic_trie, { - edge :: #topic_trie_edge{}, - node_id :: binary() + edge :: #topic_trie_edge{}, + node_id :: binary() | atom() }). %%------------------------------------------------------------------------------ diff --git a/apps/emqtt/src/emqtt_pubsub.erl b/apps/emqtt/src/emqtt_pubsub.erl index c05e344f1..5736f7a83 100644 --- a/apps/emqtt/src/emqtt_pubsub.erl +++ b/apps/emqtt/src/emqtt_pubsub.erl @@ -300,10 +300,10 @@ trie_match(NodeId, [W|Words], ResAcc) -> [#topic_trie{node_id=ChildId}] -> trie_match(ChildId, Words, Acc); [] -> Acc end - end, 'trie_match_#'(NodeId, ResAcc), [W, <<"+">>]). + end, 'trie_match_#'(NodeId, ResAcc), [W, '+']). 'trie_match_#'(NodeId, ResAcc) -> - case mnesia:read(topic_trie, #topic_trie_edge{node_id=NodeId, word = <<"#">>}) of + case mnesia:read(topic_trie, #topic_trie_edge{node_id=NodeId, word = '#'}) of [#topic_trie{node_id=ChildId}] -> mnesia:read(topic_trie_node, ChildId) ++ ResAcc; [] -> From 9cabf1be0ccc947223d1507398fa9b49cd0fb05d Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Mon, 19 Jan 2015 22:22:07 +0800 Subject: [PATCH 8/8] 0.3.0-beta --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e7671ab7d..dc000b780 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ eMQTT ChangeLog ================== -v0.3.1-beta (2015-01-24) +v0.3.0-beta (2015-01-19) ------------------------ Feature: HTTP POST API to support 'qos', 'retain' parameters