diff --git a/apps/emqttd/include/emqttd.hrl b/apps/emqttd/include/emqttd.hrl index 45d2a7330..f92ff58eb 100644 --- a/apps/emqttd/include/emqttd.hrl +++ b/apps/emqttd/include/emqttd.hrl @@ -41,6 +41,27 @@ %%------------------------------------------------------------------------------ -type pubsub() :: publish | subscribe. +%%------------------------------------------------------------------------------ +%% MQTT Topic +%%------------------------------------------------------------------------------ +-record(mqtt_topic, { + name :: binary(), + node :: node() +}). + +-type mqtt_topic() :: #mqtt_topic{}. + +%%------------------------------------------------------------------------------ +%% MQTT Subscriber +%%------------------------------------------------------------------------------ +-record(mqtt_subscriber, { + topic :: binary(), + qos = 0 :: non_neg_integer(), + subpid :: pid() +}). + +-type mqtt_subscriber() :: #mqtt_subscriber{}. + %%------------------------------------------------------------------------------ %% MQTT Client %%------------------------------------------------------------------------------ diff --git a/apps/emqttd/include/emqttd_packet.hrl b/apps/emqttd/include/emqttd_packet.hrl index d0160e776..c8c3c1efb 100644 --- a/apps/emqttd/include/emqttd_packet.hrl +++ b/apps/emqttd/include/emqttd_packet.hrl @@ -20,7 +20,7 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqtt packet header. +%%% MQTT Packet Header. %%% %%% @end %%%----------------------------------------------------------------------------- @@ -235,3 +235,4 @@ payload :: binary()}). -type mqtt_message() :: #mqtt_message{}. + diff --git a/apps/emqttd/include/emqttd_topic.hrl b/apps/emqttd/include/emqttd_topic.hrl deleted file mode 100644 index 4313d4885..000000000 --- a/apps/emqttd/include/emqttd_topic.hrl +++ /dev/null @@ -1,55 +0,0 @@ -%%----------------------------------------------------------------------------- -%% Copyright (c) 2012-2015, Feng Lee -%% -%% Permission is hereby granted, free of charge, to any person obtaining a copy -%% of this software and associated documentation files (the "Software"), to deal -%% in the Software without restriction, including without limitation the rights -%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -%% copies of the Software, and to permit persons to whom the Software is -%% furnished to do so, subject to the following conditions: -%% -%% The above copyright notice and this permission notice shall be included in all -%% copies or substantial portions of the Software. -%% -%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -%% SOFTWARE. -%%------------------------------------------------------------------------------ - -%%------------------------------------------------------------------------------ -%% Core PubSub Topic -%%------------------------------------------------------------------------------ --record(topic, { - name :: binary(), - %type = dynamic :: static | dynamic | bridge, - node :: node() -}). - --type topic() :: #topic{}. - --record(topic_subscriber, { - topic :: binary(), - qos = 0 :: non_neg_integer(), - subpid :: pid() -}). - --record(topic_trie_node, { - node_id :: binary() | atom(), - edge_count = 0 :: non_neg_integer(), - topic :: binary() -}). - --record(topic_trie_edge, { - node_id :: binary() | atom(), - word :: binary() | atom() -}). - --record(topic_trie, { - edge :: #topic_trie_edge{}, - node_id :: binary() | atom() -}). - diff --git a/apps/emqttd/src/emqtt_trie.erl b/apps/emqttd/src/emqtt_trie.erl new file mode 100644 index 000000000..96d1bdd68 --- /dev/null +++ b/apps/emqttd/src/emqtt_trie.erl @@ -0,0 +1,214 @@ +%%%----------------------------------------------------------------------------- +%%% @Copyright (C) 2012-2015, Feng Lee +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% MQTT Topic Trie Tree. +%%% +%%% [Trie](http://en.wikipedia.org/wiki/Trie) +%%% +%%% @end +%%%----------------------------------------------------------------------------- +-module(emqttd_trie). + +-author('feng@emqtt.io'). + +%% Mnesia Callbacks +-export([mnesia/1]). + +-mnesia_create({mnesia, [create]}). +-mnesia_replicate({mnesia, [replicate]}). + +%% Trie API +-export([insert/1, find/1, delete/1]). + +-type node_id() :: binary() | atom(). + +-record(trie_node, { + node_id :: node_id(), + edge_count = 0 :: non_neg_integer(), + topic :: binary() | undefined +}). + +-record(trie_edge, { + node_id :: node_id(), + word :: binary() | atom() +}). + +-record(trie, { + edge :: #trie_edge{}, + node_id :: node_id() +}). + +%%%============================================================================= +%%% Mnesia Callbacks +%%%============================================================================= + +%%------------------------------------------------------------------------------ +%% @doc Create trie tables. +%% @end +%%------------------------------------------------------------------------------ +-spec mnesia(create | replicate) -> ok. +mnesia(create) -> + %% trie tree tables + ok = emqttd_mnesia:create_table(trie, [ + {ram_copies, [node()]}, + {record_name, trie}, + {attributes, record_info(fields, trie)}]), + ok = emqttd_mnesia:create_table(trie_node, [ + {ram_copies, [node()]}, + {record_name, trie_node}, + {attributes, record_info(fields, trie_node)}]); + +%%------------------------------------------------------------------------------ +%% @doc Replicate trie tables. +%% @end +%%------------------------------------------------------------------------------ +mnesia(replicate) -> + ok = emqttd_mnesia:copy_table(trie), + ok = emqttd_mnesia:copy_table(trie_node). + +%%%============================================================================= +%%% API +%%%============================================================================= + +%%------------------------------------------------------------------------------ +%% @doc Insert topic to trie tree. +%% @end +%%------------------------------------------------------------------------------ +insert(Topic) when is_binary(Topic) -> + case mnesia:read(trie_node, Topic) of + [#trie_node{topic=Topic}] -> + ok; + [TrieNode=#trie_node{topic=undefined}] -> + mnesia:write(TrieNode#trie_node{topic=Topic}); + [] -> + %add trie path + [add_path(Triple) || Triple <- emqttd_topic:triples(Topic)], + %add last node + mnesia:write(#trie_node{node_id=Topic, topic=Topic}) + end. + +%%------------------------------------------------------------------------------ +%% @doc Find trie nodes that match topic. +%% @end +%%------------------------------------------------------------------------------ +find(Topic) when is_binary(Topic) -> + match_node(root, emqttd_topic:words(Topic), []). + +%%------------------------------------------------------------------------------ +%% @doc Delete topic to trie tree. +%% @end +%%------------------------------------------------------------------------------ +delete(Topic) when is_binary(Topic) -> + case mnesia:read(trie_node, Topic) of + [#trie_node{edge_count=0}] -> + mnesia:delete({trie_node, Topic}), + delete_path(lists:reverse(emqttd_topic:triples(Topic))); + [TrieNode] -> + mnesia:write(TrieNode#trie_node{topic=Topic}); + [] -> + ignore + end. + + +%%%============================================================================= +%%% Internal functions +%%%============================================================================= + +%%------------------------------------------------------------------------------ +%% @doc +%% @private +%% Add one path to trie tree. +%% +%% @end +%%------------------------------------------------------------------------------ +add_path({Node, Word, Child}) -> + Edge = #trie_edge{node_id=Node, word=Word}, + case mnesia:read(trie_node, Node) of + [TrieNode = #trie_node{edge_count=Count}] -> + case mnesia:wread(trie, Edge) of + [] -> + mnesia:write(TrieNode#trie_node{edge_count=Count+1}), + mnesia:write(#trie{edge=Edge, node_id=Child}); + [_] -> + ok + end; + [] -> + mnesia:write(#trie_node{node_id=Node, edge_count=1}), + mnesia:write(#trie{edge=Edge, node_id=Child}) + end. + +%%------------------------------------------------------------------------------ +%% @doc +%% @private +%% Match node include word or '+'. +%% +%% @end +%%------------------------------------------------------------------------------ +match_node(NodeId, [], ResAcc) -> + mnesia:read(trie_node, NodeId) ++ 'match_#'(NodeId, ResAcc); + +match_node(NodeId, [W|Words], ResAcc) -> + lists:foldl(fun(WArg, Acc) -> + case mnesia:read(trie, #trie_edge{node_id=NodeId, word=WArg}) of + [#trie{node_id=ChildId}] -> match_node(ChildId, Words, Acc); + [] -> Acc + end + end, 'match_#'(NodeId, ResAcc), [W, '+']). + +%%------------------------------------------------------------------------------ +%% @doc +%% @private +%% Match '#'. +%% +%% @end +%%------------------------------------------------------------------------------ +'match_#'(NodeId, ResAcc) -> + case mnesia:read(trie, #trie_edge{node_id=NodeId, word = '#'}) of + [#trie{node_id=ChildId}] -> + mnesia:read(trie_node, ChildId) ++ ResAcc; + [] -> + ResAcc + end. + +%%------------------------------------------------------------------------------ +%% @doc +%% @private +%% Delete paths from trie tree. +%% +%% @end +%%------------------------------------------------------------------------------ +delete_path([]) -> + ok; +delete_path([{NodeId, Word, _} | RestPath]) -> + mnesia:delete({trie, #trie_edge{node_id=NodeId, word=Word}}), + case mnesia:read(trie_node, NodeId) of + [#trie_node{edge_count=1, topic=undefined}] -> + mnesia:delete({trie_node, NodeId}), + delete_path(RestPath); + [TrieNode=#trie_node{edge_count=1, topic=_}] -> + mnesia:write(TrieNode#trie_node{edge_count=0}); + [TrieNode=#trie_node{edge_count=C}] -> + mnesia:write(TrieNode#trie_node{edge_count=C-1}); + [] -> + throw({notfound, NodeId}) + end. + diff --git a/apps/emqttd/src/emqttd_mnesia.erl b/apps/emqttd/src/emqttd_mnesia.erl index 97d6714a0..f4d7b93d1 100644 --- a/apps/emqttd/src/emqttd_mnesia.erl +++ b/apps/emqttd/src/emqttd_mnesia.erl @@ -129,13 +129,13 @@ create_table(Table, Attrs) -> %% @end %%------------------------------------------------------------------------------ copy_tables() -> - ok = copy_ram_table(topic), - ok = copy_ram_table(topic_trie), - ok = copy_ram_table(topic_trie_node), - ok = copy_ram_table(topic_subscriber), - ok = copy_ram_table(message_retained). + ok = copy_table(topic), + ok = copy_table(topic_trie), + ok = copy_table(topic_trie_node), + ok = copy_table(topic_subscriber), + ok = copy_table(message_retained). -copy_ram_table(Table) -> +copy_table(Table) -> case mnesia:add_table_copy(Table, node(), ram_copies) of {atomic, ok} -> ok; {aborted, {already_exists, Table, _Node}} -> ok; diff --git a/apps/emqttd/src/emqttd_pubsub.erl b/apps/emqttd/src/emqttd_pubsub.erl index d16709884..5cfd0d2ff 100644 --- a/apps/emqttd/src/emqttd_pubsub.erl +++ b/apps/emqttd/src/emqttd_pubsub.erl @@ -22,18 +22,12 @@ %%% @doc %%% emqttd core pubsub. %%% -%%% TODO: should not use gen_server:call to create, subscribe topics... -%%% %%% @end %%%----------------------------------------------------------------------------- -module(emqttd_pubsub). -author('feng@emqtt.io'). --behaviour(gen_server). - --define(SERVER, ?MODULE). - -include("emqttd.hrl"). -include("emqttd_topic.hrl"). @@ -42,28 +36,24 @@ -include_lib("stdlib/include/qlc.hrl"). +-behaviour(gen_server). + +-define(SERVER, ?MODULE). + %% API Exports -export([start_link/0, getstats/0]). --export([topics/0, - create/1, - subscribe/1, - unsubscribe/1, - publish/1, - publish/2, - %local node - dispatch/2, - match/1]). +-export([create/1, + subscribe/1, unsubscribe/1, + publish/1, publish/2, + %local node + dispatch/2, match/1]). %% gen_server Function Exports --export([init/1, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2, - code_change/3]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). -record(state, {}). @@ -93,29 +83,19 @@ getstats() -> {'subscribers/count', mnesia:table_info(topic_subscriber, size)}, {'subscribers/max', emqttd_broker:getstat('subscribers/max')}]. -%%------------------------------------------------------------------------------ -%% @doc -%% All Topics. -%% -%% @end -%%------------------------------------------------------------------------------ --spec topics() -> list(binary()). -topics() -> - mnesia:dirty_all_keys(topic). - %%------------------------------------------------------------------------------ %% @doc %% Create static topic. %% %% @end %%------------------------------------------------------------------------------ --spec create(binary()) -> {atomic, Reason :: any()} | {aborted, Reason :: any()}. -create(Topic) when is_binary(Topic) -> - {atomic, ok} = mnesia:transaction(fun trie_add/1, [Topic]), ok. +-spec create(binary()) -> ok. +create(Topic) when is_binary(Topic) -> + {atomic, ok} = mnesia:transaction(fun add_topic/1, [emqttd_topic:new(Topic)]), ok. %%------------------------------------------------------------------------------ %% @doc -%% Subscribe Topic or Topics +%% Subscribe topics %% %% @end %%------------------------------------------------------------------------------ @@ -130,13 +110,15 @@ subscribe(Topics = [{_Topic, _Qos}|_]) -> subscribe([], _SubPid, Acc) -> {ok, lists:reverse(Acc)}; -%%TODO: check this function later. subscribe([{Topic, Qos}|Topics], SubPid, Acc) -> - Subscriber = #topic_subscriber{topic=Topic, qos = Qos, subpid=SubPid}, - F = fun() -> trie_add(Topic), mnesia:write(Subscriber) end, + TopicObj = emqttd_topic:new(Topic), + Subscriber = #topic_subscriber{topic = Topic, qos = Qos, subpid = SubPid}, + F = fun() -> trie_add(TopicObj), mnesia:write(Subscriber) end, case mnesia:transaction(F) of - {atomic, ok} -> subscribe(Topics, SubPid, [Qos|Acc]); - Error -> {error, Error} + {atomic, ok} -> + subscribe(Topics, SubPid, [Qos|Acc]); + Error -> + {error, Error} end. %%------------------------------------------------------------------------------ @@ -152,7 +134,6 @@ unsubscribe(Topic) when is_binary(Topic) -> unsubscribe(Topics = [Topic|_]) when is_list(Topics) and is_binary(Topic) -> unsubscribe(Topics, self()). -%%TODO: check this function later. unsubscribe(Topics, SubPid) -> F = fun() -> Subscribers = mnesia:index_read(topic_subscriber, SubPid, #topic_subscriber.subpid), @@ -303,86 +284,19 @@ code_change(_OldVsn, State, _Extra) -> %% true -> %% ok %% end. +%% +add_topic(Topic = #topic{name = Name, node = Node}) -> + case mnesia:wread(topic, Name) of + [] -> + trie_add(Name), + mnesia:write(Topic); + Topics -> + case lists:member(Topic, Topics) of + true -> ok; + false -> mnesia:write(Topic) + end + end. -trie_add(Topic) when is_binary(Topic) -> - mnesia:write(emqttd_topic:new(Topic)), - case mnesia:read(topic_trie_node, Topic) of - [TrieNode=#topic_trie_node{topic=undefined}] -> - mnesia:write(TrieNode#topic_trie_node{topic=Topic}); - [#topic_trie_node{topic=Topic}] -> - ok; - [] -> - %add trie path - [trie_add_path(Triple) || Triple <- emqttd_topic:triples(Topic)], - %add last node - mnesia:write(#topic_trie_node{node_id=Topic, topic=Topic}) - end. - -trie_delete(Topic) when is_binary(Topic) -> - case mnesia:read(topic_trie_node, Topic) of - [#topic_trie_node{edge_count=0}] -> - mnesia:delete({topic_trie_node, Topic}), - trie_delete_path(lists:reverse(emqttd_topic:triples(Topic))); - [TrieNode] -> - mnesia:write(TrieNode#topic_trie_node{topic=Topic}); - [] -> - ignore - end. - -trie_match(Words) -> - trie_match(root, Words, []). - -trie_match(NodeId, [], ResAcc) -> - mnesia:read(topic_trie_node, NodeId) ++ 'trie_match_#'(NodeId, ResAcc); - -trie_match(NodeId, [W|Words], ResAcc) -> - lists:foldl(fun(WArg, Acc) -> - case mnesia:read(topic_trie, #topic_trie_edge{node_id=NodeId, word=WArg}) of - [#topic_trie{node_id=ChildId}] -> trie_match(ChildId, Words, Acc); - [] -> Acc - end - end, 'trie_match_#'(NodeId, ResAcc), [W, '+']). - -'trie_match_#'(NodeId, ResAcc) -> - case mnesia:read(topic_trie, #topic_trie_edge{node_id=NodeId, word = '#'}) of - [#topic_trie{node_id=ChildId}] -> - mnesia:read(topic_trie_node, ChildId) ++ ResAcc; - [] -> - ResAcc - end. - -trie_add_path({Node, Word, Child}) -> - Edge = #topic_trie_edge{node_id=Node, word=Word}, - case mnesia:read(topic_trie_node, Node) of - [TrieNode = #topic_trie_node{edge_count=Count}] -> - case mnesia:read(topic_trie, Edge) of - [] -> - mnesia:write(TrieNode#topic_trie_node{edge_count=Count+1}), - mnesia:write(#topic_trie{edge=Edge, node_id=Child}); - [_] -> - ok - end; - [] -> - mnesia:write(#topic_trie_node{node_id=Node, edge_count=1}), - mnesia:write(#topic_trie{edge=Edge, node_id=Child}) - end. - -trie_delete_path([]) -> - ok; -trie_delete_path([{NodeId, Word, _} | RestPath]) -> - Edge = #topic_trie_edge{node_id=NodeId, word=Word}, - mnesia:delete({topic_trie, Edge}), - case mnesia:read(topic_trie_node, NodeId) of - [#topic_trie_node{edge_count=1, topic=undefined}] -> - mnesia:delete({topic_trie_node, NodeId}), - trie_delete_path(RestPath); - [TrieNode=#topic_trie_node{edge_count=1, topic=_}] -> - mnesia:write(TrieNode#topic_trie_node{edge_count=0}); - [TrieNode=#topic_trie_node{edge_count=C}] -> - mnesia:write(TrieNode#topic_trie_node{edge_count=C-1}); - [] -> - throw({notfound, NodeId}) - end. upstats() -> upstats(topic), upstats(subscribe). diff --git a/apps/emqttd/src/emqttd_retained.erl b/apps/emqttd/src/emqttd_retained.erl index 5a7a4743f..d3f347694 100644 --- a/apps/emqttd/src/emqttd_retained.erl +++ b/apps/emqttd/src/emqttd_retained.erl @@ -85,10 +85,10 @@ env() -> CPid :: pid(). redeliver(Topics, CPid) when is_pid(CPid) -> lists:foreach(fun(Topic) -> - case emqttd_topic:type(#topic{name=Topic}) of - direct -> + case emqttd_topic:wildcard(Topic) of + false -> dispatch(CPid, mnesia:dirty_read(message_retained, Topic)); - wildcard -> + true -> Fun = fun(Msg = #message_retained{topic = Name}, Acc) -> case emqttd_topic:match(Name, Topic) of true -> [Msg|Acc]; diff --git a/apps/emqttd/src/emqttd_topic.erl b/apps/emqttd/src/emqttd_topic.erl index 6b1c3dc7a..2f3aa0f2e 100644 --- a/apps/emqttd/src/emqttd_topic.erl +++ b/apps/emqttd/src/emqttd_topic.erl @@ -32,7 +32,9 @@ -import(lists, [reverse/1]). --export([new/1, type/1, match/2, validate/1, triples/1, words/1]). +-export([new/1, new/2, wildcard/1, match/2, validate/1, triples/1, words/1]). + +-type type() :: static | dynamic. -type word() :: '' | '+' | '#' | binary(). @@ -40,7 +42,7 @@ -type triple() :: {root | binary(), word(), binary()}. --export_type([word/0, triple/0]). +-export_type([type/0, word/0, triple/0]). -define(MAX_TOPIC_LEN, 65535). @@ -56,24 +58,34 @@ new(Name) when is_binary(Name) -> %%%----------------------------------------------------------------------------- %% @doc -%% Topic Type: direct or wildcard +%% New Topic with Type %% %% @end %%%----------------------------------------------------------------------------- --spec type(topic() | binary()) -> direct | wildcard. -type(#topic{ name = Name }) when is_binary(Name) -> - type(Name); -type(Topic) when is_binary(Topic) -> - type2(words(Topic)). +-spec new(type(), binary()) -> topic(). +new(Type, Name) when (Type =:= static orelse Type =:= dynamic) andalso is_binary(Name) -> + #topic{name = Name, type = Type, node = node()}. -type2([]) -> - direct; -type2(['#'|_]) -> - wildcard; -type2(['+'|_]) -> - wildcard; -type2([_H |T]) -> - type2(T). +%%%----------------------------------------------------------------------------- +%% @doc +%% Is Wildcard Topic. +%% +%% @end +%%%----------------------------------------------------------------------------- +-spec wildcard(topic() | binary()) -> true | false. +wildcard(#topic{name = Name}) when is_binary(Name) -> + wildcard(Name); +wildcard(Topic) when is_binary(Topic) -> + wildcard2(words(Topic)). + +wildcard2([]) -> + false; +wildcard2(['#'|_]) -> + true; +wildcard2(['+'|_]) -> + true; +wildcard2([_H |T]) -> + wildcard2(T). %%%----------------------------------------------------------------------------- %% @doc