From 31c687f26cdceca76a64cbe37d229cacfd73bda2 Mon Sep 17 00:00:00 2001 From: erylee Date: Tue, 1 Jan 2013 20:16:02 +0800 Subject: [PATCH] support trie topic structure --- include/emqtt.hrl | 8 +- src/emqtt_client.erl | 2 +- src/emqtt_frame.erl | 4 +- src/emqtt_router.erl | 172 ++++++++++++++++++++++++++++++------------- src/emqtt_topic.erl | 80 +++++++++++++------- 5 files changed, 185 insertions(+), 81 deletions(-) diff --git a/include/emqtt.hrl b/include/emqtt.hrl index a4872e26b..5d4dbefe6 100644 --- a/include/emqtt.hrl +++ b/include/emqtt.hrl @@ -29,7 +29,13 @@ %name: <<"a/b/c">> %node: node() %words: [<<"a">>, <<"b">>, <<"c">>] --record(topic, {name, node, words}). +-record(topic, {name, node}). + +-record(trie, {edge, node_id}). + +-record(trie_node, {node_id, edge_count=0, topic}). + +-record(trie_edge, {node_id, word}). %topic: topic name diff --git a/src/emqtt_client.erl b/src/emqtt_client.erl index 4a6270702..e6e2d2171 100644 --- a/src/emqtt_client.erl +++ b/src/emqtt_client.erl @@ -383,7 +383,7 @@ stop(Reason, State ) -> {stop, Reason, State}. valid_client_id(ClientId) -> - ClientIdLen = size(ClientId), + ClientIdLen = length(ClientId), 1 =< ClientIdLen andalso ClientIdLen =< ?CLIENT_ID_MAXLEN. retained(false, _Topic, _Msg) -> diff --git a/src/emqtt_frame.erl b/src/emqtt_frame.erl index aa6d126e6..5b9a76c87 100644 --- a/src/emqtt_frame.erl +++ b/src/emqtt_frame.erl @@ -23,7 +23,7 @@ -export([serialise/1]). -define(RESERVED, 0). --define(PROTOCOL_MAGIC, <<"MQIsdp">>). +-define(PROTOCOL_MAGIC, "MQIsdp"). -define(MAX_LEN, 16#fffffff). -define(HIGHBIT, 2#10000000). -define(LOWBITS, 2#01111111). @@ -145,7 +145,7 @@ parse_utf(Bin, _) -> parse_utf(Bin). parse_utf(<>) -> - {Str, Rest}. + {binary_to_list(Str), Rest}. parse_msg(Bin, 0) -> {undefined, Bin}; diff --git a/src/emqtt_router.erl b/src/emqtt_router.erl index 288cd9ece..98e503d88 100644 --- a/src/emqtt_router.erl +++ b/src/emqtt_router.erl @@ -11,7 +11,6 @@ %% Developer of the eMQTT Code is %% Copyright (c) 2012 Ery Lee. All rights reserved. %% - -module(emqtt_router). -include("emqtt.hrl"). @@ -22,11 +21,12 @@ -export([start_link/0]). --export([topics/1, +-export([topics/0, subscribe/2, unsubscribe/2, publish/2, route/2, + match/1, down/1]). -behaviour(gen_server). @@ -43,20 +43,17 @@ start_link() -> gen_server2:start_link({local, ?MODULE}, ?MODULE, [], []). -topics(direct) -> - mnesia:dirty_all_keys(direct_topic); - -topics(wildcard) -> - mnesia:dirty_all_keys(wildcard_topic). +topics() -> + mnesia:dirty_all_keys(topic). subscribe({Topic, Qos}, Client) when is_pid(Client) -> gen_server2:call(?MODULE, {subscribe, {Topic, Qos}, Client}). -unsubscribe(Topic, Client) when is_binary(Topic) and is_pid(Client) -> +unsubscribe(Topic, Client) when is_list(Topic) and is_pid(Client) -> gen_server2:cast(?MODULE, {unsubscribe, Topic, Client}). %publish to cluster node. -publish(Topic, Msg) when is_binary(Topic) and is_record(Msg, mqtt_msg) -> +publish(Topic, Msg) when is_list(Topic) and is_record(Msg, mqtt_msg) -> lists:foreach(fun(#topic{name=Name, node=Node}) -> case Node == node() of true -> route(Name, Msg); @@ -68,63 +65,54 @@ publish(Topic, Msg) when is_binary(Topic) and is_record(Msg, mqtt_msg) -> route(Topic, Msg) -> [Client ! {route, Msg} || #subscriber{client=Client} <- ets:lookup(subscriber, Topic)]. -match(Topic) when is_binary(Topic) -> - DirectMatches = mnesia:dirty_read(direct_topic, Topic), - TopicWords = emqtt_topic:words(Topic), - WildcardQuery = qlc:q([T || T = #topic{words=Words} - <- mnesia:table(wildcard_topic), - emqtt_topic:match(TopicWords, Words)]), % - - {atomic, WildcardMatches} = mnesia:transaction(fun() -> qlc:e(WildcardQuery) end), - %mnesia:async_dirty(fun qlc:e/1, WildcardQuery), - %?INFO("~p", [WildcardMatches]), - DirectMatches ++ WildcardMatches. +match(Topic) when is_list(Topic) -> + TrieNodes = mnesia:async_dirty(fun trie_match/1, [emqtt_topic:words(Topic)]), + Names = [Name || #trie_node{topic=Name} <- TrieNodes, Name=/= undefined], + lists:flatten([mnesia:dirty_read(topic, Name) || Name <- Names]). down(Client) when is_pid(Client) -> gen_server2:cast(?MODULE, {down, Client}). init([]) -> - mnesia:create_table(direct_topic, [ + mnesia:create_table(trie, [ + {ram_copies, [node()]}, + {attributes, record_info(fields, trie)}]), + mnesia:add_table_copy(trie, node(), ram_copies), + mnesia:create_table(trie_node, [ + {ram_copies, [node()]}, + {attributes, record_info(fields, trie_node)}]), + mnesia:add_table_copy(trie_node, node(), ram_copies), + mnesia:create_table(topic, [ {type, bag}, {record_name, topic}, {ram_copies, [node()]}, {attributes, record_info(fields, topic)}]), - mnesia:add_table_copy(direct_topic, node(), ram_copies), - mnesia:create_table(wildcard_topic, [ - {type, bag}, - {index, [#topic.words]}, - {record_name, topic}, - {ram_copies, [node()]}, - {attributes, record_info(fields, topic)}]), - mnesia:add_table_copy(wildcard_topic, node(), ram_copies), + mnesia:add_table_copy(topic, node(), ram_copies), ets:new(subscriber, [bag, named_table, {keypos, 2}]), ?INFO_MSG("emqtt_router is started."), {ok, #state{}}. -handle_call({subscribe, {Name, Qos}, Client}, _From, State) -> - Topic = #topic{name=Name, node=node(), words=emqtt_topic:words(Name)}, - case emqtt_topic:type(Topic) of - direct -> - ok = mnesia:dirty_write(direct_topic, Topic); - wildcard -> - ok = mnesia:dirty_write(wildcard_topic, Topic) - end, - ets:insert(subscriber, #subscriber{topic=Name, qos=Qos, client=Client}), - emqtt_retained:send(Name, Client), - {reply, ok, State}; +handle_call({subscribe, {Topic, Qos}, Client}, _From, State) -> + case mnesia:transaction(fun trie_add/1, [Topic]) of + {atomic, _} -> + ets:insert(subscriber, #subscriber{topic=Topic, qos=Qos, client=Client}), + emqtt_retained:send(Topic, Client), + {reply, ok, State}; + {aborted, Reason} -> + {reply, {error, Reason}, State} + end; handle_call(Req, _From, State) -> {stop, {badreq, Req}, State}. handle_cast({unsubscribe, Topic, Client}, State) -> - ets:match_delete(subscriber, {subscriber, Topic, '_', Client}), + ets:match_delete(subscriber, #subscriber{topic=Topic, client=Client, _='_'}), try_remove_topic(Topic), {noreply, State}; handle_cast({down, Client}, State) -> - case ets:match_object(subscriber, {subscriber, '_', '_', Client}) of - [] -> - ignore; + case ets:match_object(subscriber, #subscriber{client=Client, _='_'}) of + [] -> ignore; Subs -> [ets:delete_object(subscriber, Sub) || Sub <- Subs], [try_remove_topic(Topic) || #subscriber{topic=Topic} <- Subs] @@ -134,7 +122,6 @@ handle_cast({down, Client}, State) -> handle_cast(Msg, State) -> {stop, {badmsg, Msg}, State}. - handle_info(Info, State) -> {stop, {badinfo, Info}, State}. @@ -151,12 +138,93 @@ try_remove_topic(Name) -> case ets:member(subscriber, Name) of false -> Topic = emqtt_topic:new(Name), - case emqtt_topic:type(Topic) of - direct -> - mnesia:dirty_delete_object(direct_topic, Topic); - wildcard -> - mnesia:dirty_delete_object(wildcard_topic, Topic) - end; - true -> ok + Fun = fun() -> + mnesia:delete_object(topic, Topic), + case mnesia:read(topic, Topic) of + [] -> trie_delete(Name); + _ -> ignore + end + end, + mnesia:transaction(Fun); + true -> + ok + end. + +trie_add(Topic) -> + mnesia:write(emqtt_topic:new(Topic)), + case mnesia:read(trie_node, Topic) of + [TrieNode=#trie_node{topic=undefined}] -> + mnesia:write(TrieNode#trie_node{topic=Topic}); + [#trie_node{topic=Topic}] -> + ignore; + [] -> + %add trie path + [trie_add_path(Triple) || Triple <- emqtt_topic:triples(Topic)], + %add last node + mnesia:write(#trie_node{node_id=Topic, topic=Topic}) + end. + +trie_delete(Topic) -> + case mnesia:read(trie_node, Topic) of + [#trie_node{edge_count=0}] -> + mnesia:delete({trie_node, Topic}), + trie_delete_path(lists:reverse(emqtt_topic:triples(Topic))); + [TrieNode] -> + mnesia:write(TrieNode#trie_node{topic=Topic}); + [] -> + ignore + end. + +trie_match(Words) -> + trie_match(root, Words, []). + +trie_match(NodeId, [], ResAcc) -> + mnesia:read(trie_node, NodeId) ++ 'trie_match_#'(NodeId, ResAcc); + +trie_match(NodeId, [W|Words], ResAcc) -> + lists:foldl(fun(WArg, Acc) -> + case mnesia:read(trie, #trie_edge{node_id=NodeId, word=WArg}) of + [#trie{node_id=ChildId}] -> trie_match(ChildId, Words, Acc); + [] -> Acc + end + end, 'trie_match_#'(NodeId, ResAcc), [W, "+"]). + +'trie_match_#'(NodeId, ResAcc) -> + case mnesia:read(trie, #trie_edge{node_id=NodeId, word="#"}) of + [#trie{node_id=ChildId}] -> + mnesia:read(trie_node, ChildId) ++ ResAcc; + [] -> + ResAcc + end. + +trie_add_path({Node, Word, Child}) -> + Edge = #trie_edge{node_id=Node, word=Word}, + case mnesia:read(trie_node, Node) of + [TrieNode = #trie_node{edge_count=Count}] -> + case mnesia:read(trie, Edge) of + [] -> + mnesia:write(TrieNode#trie_node{edge_count=Count+1}), + mnesia:write(#trie{edge=Edge, node_id=Child}); + [_] -> + ok + end; + [] -> + mnesia:write(#trie_node{node_id=Node, edge_count=1}), + mnesia:write(#trie{edge=Edge, node_id=Child}) + end. + +trie_delete_path([{NodeId, Word, _} | RestPath]) -> + Edge = #trie_edge{node_id=NodeId, word=Word}, + mnesia:delete({trie, Edge}), + case mnesia:read(trie_node, NodeId) of + [#trie_node{edge_count=1, topic=undefined}] -> + mnesia:delete({trie_node, NodeId}), + trie_delete_path(RestPath); + [TrieNode=#trie_node{edge_count=1, topic=_}] -> + mnesia:write(TrieNode#trie_node{edge_count=0}); + [TrieNode=#trie_node{edge_count=C}] -> + mnesia:write(TrieNode#trie_node{edge_count=C-1}); + [] -> + throw({notfound, NodeId}) end. diff --git a/src/emqtt_topic.erl b/src/emqtt_topic.erl index 6fb4b3e8a..d6259d1be 100644 --- a/src/emqtt_topic.erl +++ b/src/emqtt_topic.erl @@ -13,6 +13,8 @@ %% -module(emqtt_topic). +-import(string, [rchr/2, substr/2, substr/3]). + %% ------------------------------------------------------------------------ %% Topic semantics and usage %% ------------------------------------------------------------------------ @@ -35,25 +37,30 @@ -include("emqtt.hrl"). --export([new/1, type/1, match/2, validate/1, words/1]). +-export([new/1, + type/1, + match/2, + validate/1, + triples/1, + words/1]). -export([test/0]). -define(MAX_LEN, 64*1024). -new(Name) when is_binary(Name) -> - #topic{name=Name, node=node(), words=words(Name)}. +new(Name) when is_list(Name) -> + #topic{name=Name, node=node()}. %% ------------------------------------------------------------------------ %% topic type: direct or wildcard %% ------------------------------------------------------------------------ -type(#topic{words=Words}) -> - type(Words); +type(#topic{name=Name}) -> + type(words(Name)); type([]) -> direct; -type([<<"#">>]) -> +type(["#"]) -> wildcard; -type([<<"+">>|_T]) -> +type(["+"|_T]) -> wildcard; type([_|T]) -> type(T). @@ -65,9 +72,9 @@ match([], []) -> true; match([H|T1], [H|T2]) -> match(T1, T2); -match([_H|T1], [<<"+">>|T2]) -> +match([_H|T1], ["+"|T2]) -> match(T1, T2); -match(_, [<<"#">>]) -> +match(_, ["#"]) -> true; match([_H1|_], [_H2|_]) -> false; @@ -78,38 +85,61 @@ match([], [_H|_T2]) -> %% ------------------------------------------------------------------------ %% topic validate %% ------------------------------------------------------------------------ -validate({_, <<>>}) -> +validate({_, ""}) -> false; -validate({_, Topic}) when size(Topic) > ?MAX_LEN -> +validate({_, Topic}) when length(Topic) > ?MAX_LEN -> false; -validate({subscribe, Topic}) when is_binary(Topic) -> +validate({subscribe, Topic}) when is_list(Topic) -> valid(words(Topic)); -validate({publish, Topic}) when is_binary(Topic) -> +validate({publish, Topic}) when is_list(Topic) -> Words = words(Topic), valid(Words) and (not include_wildcard(Words)). -words(Topic) when is_binary(Topic) -> - binary:split(Topic, [<<"/">>], [global]). +triples(S) when is_list(S) -> + triples(S, []). -valid([<<>>|Words]) -> valid2(Words); +triples(S, Acc) -> + triples(rchr(S, $/), S, Acc). + +triples(0, S, Acc) -> + [{root, S, S}|Acc]; + +triples(I, S, Acc) -> + S1 = substr(S, 1, I-1), + S2 = substr(S, I+1), + triples(S1, [{S1, S2, S}|Acc]). + +words(Topic) when is_list(Topic) -> + words(Topic, [], []). + +words([], Word, ResAcc) -> + lists:reverse([Word|ResAcc]); + +words([$/|Topic], Word, ResAcc) -> + words(Topic, [], [Word|ResAcc]); + +words([C|Topic], Word, ResAcc) -> + words(Topic, lists:reverse([C|Word]), ResAcc). + +valid([""|Words]) -> valid2(Words); valid(Words) -> valid2(Words). -valid2([<<>>|_Words]) -> false; -valid2([<<"#">>|Words]) when length(Words) > 0 -> false; +valid2([""|_Words]) -> false; +valid2(["#"|Words]) when length(Words) > 0 -> false; valid2([_|Words]) -> valid2(Words); valid2([]) -> true. include_wildcard([]) -> false; -include_wildcard([<<"#">>|_T]) -> true; -include_wildcard([<<"+">>|_T]) -> true; +include_wildcard(["#"|_T]) -> true; +include_wildcard(["+"|_T]) -> true; include_wildcard([_H|T]) -> include_wildcard(T). test() -> - true = validate({subscribe, <<"a/b/c">>}), - true = validate({subscribe, <<"/a/b">>}), - true = validate({subscribe, <<"/+/x">>}), - true = validate({subscribe, <<"/a/b/c/#">>}), - false = validate({subscribe, <<"a/#/c">>}), + true = validate({subscribe, "a/b/c"}), + true = validate({subscribe, "/a/b"}), + true = validate({subscribe, "/+/x"}), + true = validate({subscribe, "/a/b/c/#"}), + false = validate({subscribe, "a/#/c"}), ok.