diff --git a/apps/emqttd/include/emqttd.hrl b/apps/emqttd/include/emqttd.hrl index f92ff58eb..0f10daf63 100644 --- a/apps/emqttd/include/emqttd.hrl +++ b/apps/emqttd/include/emqttd.hrl @@ -56,7 +56,7 @@ %%------------------------------------------------------------------------------ -record(mqtt_subscriber, { topic :: binary(), - qos = 0 :: non_neg_integer(), + qos = 0 :: 0 | 1 | 2, subpid :: pid() }). diff --git a/apps/emqttd/src/emqtt_trie.erl b/apps/emqttd/src/emqtt_trie.erl index 96d1bdd68..e8e326ab4 100644 --- a/apps/emqttd/src/emqtt_trie.erl +++ b/apps/emqttd/src/emqtt_trie.erl @@ -62,7 +62,9 @@ %%%============================================================================= %%------------------------------------------------------------------------------ -%% @doc Create trie tables. +%% @doc +%% Create trie tables. +%% %% @end %%------------------------------------------------------------------------------ -spec mnesia(create | replicate) -> ok. @@ -78,7 +80,9 @@ mnesia(create) -> {attributes, record_info(fields, trie_node)}]); %%------------------------------------------------------------------------------ -%% @doc Replicate trie tables. +%% @doc +%% Replicate trie tables. +%% %% @end %%------------------------------------------------------------------------------ mnesia(replicate) -> @@ -90,7 +94,9 @@ mnesia(replicate) -> %%%============================================================================= %%------------------------------------------------------------------------------ -%% @doc Insert topic to trie tree. +%% @doc +%% Insert topic to trie tree. +%% %% @end %%------------------------------------------------------------------------------ insert(Topic) when is_binary(Topic) -> @@ -107,14 +113,18 @@ insert(Topic) when is_binary(Topic) -> end. %%------------------------------------------------------------------------------ -%% @doc Find trie nodes that match topic. +%% @doc +%% Find trie nodes that match topic. +%% %% @end %%------------------------------------------------------------------------------ find(Topic) when is_binary(Topic) -> match_node(root, emqttd_topic:words(Topic), []). %%------------------------------------------------------------------------------ -%% @doc Delete topic to trie tree. +%% @doc +%% Delete topic from trie tree. +%% %% @end %%------------------------------------------------------------------------------ delete(Topic) when is_binary(Topic) -> @@ -128,7 +138,6 @@ delete(Topic) when is_binary(Topic) -> ignore end. - %%%============================================================================= %%% Internal functions %%%============================================================================= @@ -136,7 +145,7 @@ delete(Topic) when is_binary(Topic) -> %%------------------------------------------------------------------------------ %% @doc %% @private -%% Add one path to trie tree. +%% Add path to trie tree. %% %% @end %%------------------------------------------------------------------------------ @@ -159,7 +168,7 @@ add_path({Node, Word, Child}) -> %%------------------------------------------------------------------------------ %% @doc %% @private -%% Match node include word or '+'. +%% Match node with word or '+'. %% %% @end %%------------------------------------------------------------------------------ @@ -177,7 +186,7 @@ match_node(NodeId, [W|Words], ResAcc) -> %%------------------------------------------------------------------------------ %% @doc %% @private -%% Match '#'. +%% Match node with '#'. %% %% @end %%------------------------------------------------------------------------------ diff --git a/apps/emqttd/src/emqttd_pubsub.erl b/apps/emqttd/src/emqttd_pubsub.erl index 3b62d1413..f29438804 100644 --- a/apps/emqttd/src/emqttd_pubsub.erl +++ b/apps/emqttd/src/emqttd_pubsub.erl @@ -32,15 +32,12 @@ -include("emqttd_packet.hrl"). --include_lib("stdlib/include/qlc.hrl"). - -behaviour(gen_server). -define(SERVER, ?MODULE). %% API Exports - --export([start_link/0, getstats/0]). +-export([start_link/0]). -export([create/1, subscribe/1, unsubscribe/1, @@ -49,7 +46,6 @@ dispatch/2, match/1]). %% gen_server Function Exports - -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -71,25 +67,13 @@ start_link() -> %%------------------------------------------------------------------------------ %% @doc -%% Get stats of PubSub. -%% -%% @end -%%------------------------------------------------------------------------------ --spec getstats() -> [{atom(), non_neg_integer()}]. -getstats() -> - [{'topics/count', mnesia:table_info(topic, size)}, - {'subscribers/count', mnesia:table_info(topic_subscriber, size)}, - {'subscribers/max', emqttd_broker:getstat('subscribers/max')}]. - -%%------------------------------------------------------------------------------ -%% @doc -%% Create static topic. +%% Create topic. %% %% @end %%------------------------------------------------------------------------------ -spec create(binary()) -> ok. create(Topic) when is_binary(Topic) -> - {atomic, ok} = mnesia:transaction(fun add_topic/1, [emqttd_topic:new(Topic)]), ok. + {atomic, ok} = mnesia:transaction(fun insert_topic/1, [emqttd_topic:new(Topic)]), ok. %%------------------------------------------------------------------------------ %% @doc @@ -97,28 +81,38 @@ create(Topic) when is_binary(Topic) -> %% %% @end %%------------------------------------------------------------------------------ --spec subscribe({binary(), mqtt_qos()} | list()) -> {ok, list(mqtt_qos())}. -subscribe({Topic, Qos}) when is_binary(Topic) -> - case subscribe([{Topic, Qos}]) of - {ok, [GrantedQos]} -> {ok, GrantedQos}; - {error, Error} -> {error, Error} - end; -subscribe(Topics = [{_Topic, _Qos}|_]) -> - subscribe(Topics, self(), []). +-spec subscribe({Topic, Qos} | list({Topic, Qos})) -> {ok, Qos | list(Qos)} when + Topic :: binary(), + Qos :: mqtt_qos(). +subscribe(Topics = [{Topic, Qos}|_]) when is_binary(Topic) andalso ?IS_QOS(Qos) -> + subscribe2(Topics, []). -subscribe([], _SubPid, Acc) -> - {ok, lists:reverse(Acc)}; -subscribe([{Topic, Qos}|Topics], SubPid, Acc) -> - TopicObj = emqttd_topic:new(Topic), - Subscriber = #topic_subscriber{topic = Topic, qos = Qos, subpid = SubPid}, - F = fun() -> trie_add(TopicObj), mnesia:write(Subscriber) end, +-spec subscribe(Topic :: binary(), Qos :: mqtt_qos()) -> {ok, Qos :: mqtt_qos()}. +subscribe(Topic, Qos) when is_binary(Topic) andalso ?IS_QOS(Qos) -> + TopicRecord = emqttd_topic:new(Topic), + Subscriber = #mqtt_subscriber{topic = Topic, qos = Qos, subpid = self()}, + F = fun() -> + case insert_topic(TopicRecord) of + ok -> insert_subscriber(Subscriber); + Error -> Error + end + end, case mnesia:transaction(F) of - {atomic, ok} -> - subscribe(Topics, SubPid, [Qos|Acc]); - Error -> - {error, Error} + {atomic, ok} -> {ok, Qos}; + Error -> Error end. +subscribe2([], QosAcc) -> + {ok, lists:reverse(QosAcc)}; +subscribe2([{Topic, Qos}|Topics], Acc) -> + case subscribe(Topic, Qos) of + {ok, GrantedQos} -> + subscribe2(Topics, [GrantedQos|Acc]); + Error -> + Error + end. + + %%------------------------------------------------------------------------------ %% @doc %% Unsubscribe Topic or Topics @@ -283,7 +277,7 @@ code_change(_OldVsn, State, _Extra) -> %% ok %% end. %% -add_topic(Topic = #topic{name = Name, node = Node}) -> +insert_topic(Topic = #mqtt_topic{name = Name, node = Node}) -> case mnesia:wread(topic, Name) of [] -> trie_add(Name), diff --git a/apps/emqttd/src/emqttd_topic.erl b/apps/emqttd/src/emqttd_topic.erl index 6db37dd46..f7584d901 100644 --- a/apps/emqttd/src/emqttd_topic.erl +++ b/apps/emqttd/src/emqttd_topic.erl @@ -28,11 +28,13 @@ -author('feng@emqtt.io'). +-include("emqttd.hrl"). + -import(lists, [reverse/1]). --export([new/1, new/2, wildcard/1, match/2, validate/1, triples/1, words/1]). +-export([new/1, wildcard/1, match/2, validate/1, triples/1, words/1]). --type type() :: static | dynamic. +%-type type() :: static | dynamic. -type word() :: '' | '+' | '#' | binary(). @@ -40,7 +42,7 @@ -type triple() :: {root | binary(), word(), binary()}. --export_type([type/0, word/0, triple/0]). +-export_type([word/0, triple/0]). -define(MAX_TOPIC_LEN, 65535). @@ -50,40 +52,29 @@ %% %% @end %%%----------------------------------------------------------------------------- --spec new(binary()) -> topic(). +-spec new(binary()) -> mqtt_topic(). new(Name) when is_binary(Name) -> - #topic{name = Name, node = node()}. + #mqtt_topic{name = Name, node = node()}. %%%----------------------------------------------------------------------------- %% @doc -%% New Topic with Type +%% Is Wildcard Topic? %% %% @end %%%----------------------------------------------------------------------------- --spec new(type(), binary()) -> topic(). -new(Type, Name) when (Type =:= static orelse Type =:= dynamic) andalso is_binary(Name) -> - #topic{name = Name, type = Type, node = node()}. - -%%%----------------------------------------------------------------------------- -%% @doc -%% Is Wildcard Topic. -%% -%% @end -%%%----------------------------------------------------------------------------- --spec wildcard(topic() | binary()) -> true | false. -wildcard(#topic{name = Name}) when is_binary(Name) -> +-spec wildcard(mqtt_topic() | binary()) -> true | false. +wildcard(#mqtt_topic{name = Name}) when is_binary(Name) -> wildcard(Name); wildcard(Topic) when is_binary(Topic) -> - wildcard2(words(Topic)). - -wildcard2([]) -> + wildcard(words(Topic)); +wildcard([]) -> false; -wildcard2(['#'|_]) -> +wildcard(['#'|_]) -> true; -wildcard2(['+'|_]) -> +wildcard(['+'|_]) -> true; -wildcard2([_H |T]) -> - wildcard2(T). +wildcard([_H|T]) -> + wildcard(T). %%%----------------------------------------------------------------------------- %% @doc @@ -130,7 +121,7 @@ validate({filter, Topic}) when is_binary(Topic) -> validate2(words(Topic)); validate({name, Topic}) when is_binary(Topic) -> Words = words(Topic), - validate2(Words) and (not include_wildcard(Words)). + validate2(Words) and (not wildcard(Words)). validate2([]) -> true; @@ -155,11 +146,6 @@ validate3(<>) when C == $#; C == $+; C == 0 -> validate3(<<_/utf8, Rest/binary>>) -> validate3(Rest). -include_wildcard([]) -> false; -include_wildcard(['#'|_T]) -> true; -include_wildcard(['+'|_T]) -> true; -include_wildcard([ _ | T]) -> include_wildcard(T). - %%%----------------------------------------------------------------------------- %% @doc %% Topic to Triples. @@ -202,4 +188,3 @@ word(<<"+">>) -> '+'; word(<<"#">>) -> '#'; word(Bin) -> Bin. -