From be019ca0337f2fffc36b05454a15c106e137d2b4 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 8 Dec 2014 11:24:07 +0800 Subject: [PATCH] fix topic --- apps/emqtt/include/emqtt_internal.hrl | 82 ++++++++++++++++++--------- apps/emqtt/src/emqtt_client.erl | 1 - apps/emqtt/src/emqtt_topic.erl | 72 ++++++++++++----------- apps/emqtt/test/emqtt_topic_tests.erl | 24 ++++++++ 4 files changed, 115 insertions(+), 64 deletions(-) create mode 100644 apps/emqtt/test/emqtt_topic_tests.erl diff --git a/apps/emqtt/include/emqtt_internal.hrl b/apps/emqtt/include/emqtt_internal.hrl index 8054a9500..ed60b1ea4 100644 --- a/apps/emqtt/include/emqtt_internal.hrl +++ b/apps/emqtt/include/emqtt_internal.hrl @@ -1,36 +1,66 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% -%% The Initial Developer of the Original Code is ery.lee@gmail.com -%% Copyright (c) 2012 Ery Lee. All rights reserved. -%% - -%% ------------------------------------------- -%% banner -%% ------------------------------------------- - --record(internal_user, {username, passwdhash}). +%%----------------------------------------------------------------------------- +%% Copyright (c) 2014, Feng Lee +%% +%% Permission is hereby granted, free of charge, to any person obtaining a copy +%% of this software and associated documentation files (the "Software"), to deal +%% in the Software without restriction, including without limitation the rights +%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%% copies of the Software, and to permit persons to whom the Software is +%% furnished to do so, subject to the following conditions: +%% +%% The above copyright notice and this permission notice shall be included in all +%% copies or substantial portions of the Software. +%% +%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%% SOFTWARE. +%%------------------------------------------------------------------------------ +%%------------------------------------------------------------------------------ +%% pubsub topic +%%------------------------------------------------------------------------------ %name: <<"a/b/c">> %node: node() %words: [<<"a">>, <<"b">>, <<"c">>] --record(topic, {name, node}). +-record(topic, { + name :: binary(), + node :: node() +}). --record(trie, {edge, node_id}). +-type topic() :: #topic{}. --record(trie_node, {node_id, edge_count=0, topic}). +-record(topic_subscriber, { + topic :: binary(), + qos = 0 :: integer(), + subpid :: pid() +}). --record(trie_edge, {node_id, word}). +-record(topic_trie_node, { + node_id :: binary(), + edge_count = 0 :: non_neg_integer(), + topic :: binary() +}). -%topic: topic name +-record(topic_trie_edge, { + node_id :: binary(), + word :: binary() +}). --record(subscriber, {topic, qos, client}). +-record(topic_trie, { + edge :: #topic_trie_edge{}, + node_id :: binary() +}). + + +%%------------------------------------------------------------------------------ +%% internal user +%%------------------------------------------------------------------------------ +-record(internal_user, { + username :: binary(), + passwdhash :: binary() +}). diff --git a/apps/emqtt/src/emqtt_client.erl b/apps/emqtt/src/emqtt_client.erl index f665c3c1d..305c4237e 100644 --- a/apps/emqtt/src/emqtt_client.erl +++ b/apps/emqtt/src/emqtt_client.erl @@ -200,7 +200,6 @@ process_received_bytes(<<>>, State) -> process_received_bytes(Bytes, State = #state{ parse_state = ParseState, conn_name = ConnStr }) -> - ?INFO("~p~n", [Bytes]), case emqtt_frame:parse(Bytes, ParseState) of {more, ParseState1} -> {noreply, diff --git a/apps/emqtt/src/emqtt_topic.erl b/apps/emqtt/src/emqtt_topic.erl index 43cded3de..e82b02956 100644 --- a/apps/emqtt/src/emqtt_topic.erl +++ b/apps/emqtt/src/emqtt_topic.erl @@ -54,23 +54,25 @@ triples/1, words/1]). --export([test/0]). +-define(MAX_LEN, 1024). --define(MAX_LEN, 64*1024). - -new(Name) when is_list(Name) -> +-spec new(Name :: binary()) -> topic(). +new(Name) when is_binary(Name) -> #topic{name=Name, node=node()}. %% ------------------------------------------------------------------------ %% topic type: direct or wildcard %% ------------------------------------------------------------------------ -type(#topic{name=Name}) -> +-spec type(Topic :: topic()) -> direct | wildcard. +type(#topic{name=Name}) when is_binary(Name) -> type(words(Name)); -type([]) -> +type([]) -> direct; -type(["#"]) -> +type([<<>>|T]) -> + type(T); +type([<<$#, _/binary>>|_]) -> wildcard; -type(["+"|_T]) -> +type([<<$+, _/binary>>|_]) -> wildcard; type([_|T]) -> type(T). @@ -78,52 +80,55 @@ type([_|T]) -> %% ------------------------------------------------------------------------ %% topic match %% ------------------------------------------------------------------------ +-spec match(B1 :: binary(), B2 :: binary()) -> boolean(). +match(B1, B2) when is_binary(B1) and is_binary(B2) -> + match(words(B1), words(B2)); match([], []) -> true; match([H|T1], [H|T2]) -> match(T1, T2); -match([_H|T1], ["+"|T2]) -> +match([_H|T1], [<<"+">>|T2]) -> match(T1, T2); -match(_, ["#"]) -> +match(_, [<<"#">>]) -> true; match([_H1|_], [_H2|_]) -> false; match([], [_H|_T2]) -> false. - %% ------------------------------------------------------------------------ %% topic validate %% ------------------------------------------------------------------------ -validate({_, ""}) -> +-spec validate({Type :: subscribe | publish, Topic :: binary()}) -> boolean(). +validate({_, <<>>}) -> false; -validate({_, Topic}) when length(Topic) > ?MAX_LEN -> +validate({_, Topic}) when is_binary(Topic) and (size(Topic) > ?MAX_LEN) -> false; -validate({subscribe, Topic}) when is_list(Topic) -> +validate({subscribe, Topic}) when is_binary(Topic) -> valid(words(Topic)); -validate({publish, Topic}) when is_list(Topic) -> +validate({publish, Topic}) when is_binary(Topic) -> Words = words(Topic), valid(Words) and (not include_wildcard(Words)). -triples(S) when is_list(S) -> - triples(S, []). +triples(B) when is_binary(B) -> + triples(binary_to_list(B), []). triples(S, Acc) -> triples(rchr(S, $/), S, Acc). triples(0, S, Acc) -> - [{root, S, S}|Acc]; + [{root, l2b(S), l2b(S)}|Acc]; triples(I, S, Acc) -> S1 = substr(S, 1, I-1), S2 = substr(S, I+1), - triples(S1, [{S1, S2, S}|Acc]). + triples(S1, [{l2b(S1), l2b(S2), l2b(S)}|Acc]). -words(Topic) when is_list(Topic) -> - words(Topic, [], []). +words(Topic) when is_binary(Topic) -> + words(binary_to_list(Topic), [], []). words([], Word, ResAcc) -> - reverse([reverse(W) || W <- [Word|ResAcc]]); + reverse([l2b(reverse(W)) || W <- [Word|ResAcc]]); words([$/|Topic], Word, ResAcc) -> words(Topic, [], [Word|ResAcc]); @@ -131,25 +136,18 @@ words([$/|Topic], Word, ResAcc) -> words([C|Topic], Word, ResAcc) -> words(Topic, [C|Word], ResAcc). -valid([""|Words]) -> valid2(Words); +valid([<<>>|Words]) -> valid2(Words); valid(Words) -> valid2(Words). -valid2([""|_Words]) -> false; -valid2(["#"|Words]) when length(Words) > 0 -> false; +valid2([<<>>|_Words]) -> false; +valid2([<<"#">>|Words]) when length(Words) > 0 -> false; valid2([_|Words]) -> valid2(Words); valid2([]) -> true. -include_wildcard([]) -> false; -include_wildcard(["#"|_T]) -> true; -include_wildcard(["+"|_T]) -> true; -include_wildcard([_H|T]) -> include_wildcard(T). +include_wildcard(<<>>) -> false; +include_wildcard(<<$#, _T/binary>>) -> true; +include_wildcard(<<$+, _T/binary>>) -> true; +include_wildcard(<<_H, T/binary>>) -> include_wildcard(T). - -test() -> - true = validate({subscribe, "a/b/c"}), - true = validate({subscribe, "/a/b"}), - true = validate({subscribe, "/+/x"}), - true = validate({subscribe, "/a/b/c/#"}), - false = validate({subscribe, "a/#/c"}), - ok. +l2b(L) when is_list(L) -> list_to_binary(L). diff --git a/apps/emqtt/test/emqtt_topic_tests.erl b/apps/emqtt/test/emqtt_topic_tests.erl new file mode 100644 index 000000000..89e2eeb20 --- /dev/null +++ b/apps/emqtt/test/emqtt_topic_tests.erl @@ -0,0 +1,24 @@ +-module(emqtt_topic_tests). + +-include("emqtt_internal.hrl"). + +-import(emqtt_topic, [validate/1, type/1, match/2, triples/1, words/1]). + +-ifdef(TEST). + +-include_lib("enunit/include/enunit.hrl"). + +validate_test() -> + ?assert( validate({subscribe, <<"a/b/c">>}) ), + ?assert( validate({subscribe, <<"/a/b">>}) ), + ?assert( validate({subscribe, <<"/+/x">>}) ), + ?assert( validate({subscribe, <<"/a/b/c/#">>}) ), + ?assertNot( validate({subscribe, <<"a/#/c">>}) ). + +type_test() -> + ?assertEqual(direct, type(#topic{name = <<"/a/b/cdkd">>})), + ?assertEqual(wildcard, type(#type{name = <<"/a/+/d">>})), + ?assertEqual(wildcard, type(#type{name = <<"/a/b/#">>})). + +-endif. +