diff --git a/apps/emqttd/include/emqttd.hrl b/apps/emqttd/include/emqttd.hrl index 0f10daf63..45d2a7330 100644 --- a/apps/emqttd/include/emqttd.hrl +++ b/apps/emqttd/include/emqttd.hrl @@ -41,27 +41,6 @@ %%------------------------------------------------------------------------------ -type pubsub() :: publish | subscribe. -%%------------------------------------------------------------------------------ -%% MQTT Topic -%%------------------------------------------------------------------------------ --record(mqtt_topic, { - name :: binary(), - node :: node() -}). - --type mqtt_topic() :: #mqtt_topic{}. - -%%------------------------------------------------------------------------------ -%% MQTT Subscriber -%%------------------------------------------------------------------------------ --record(mqtt_subscriber, { - topic :: binary(), - qos = 0 :: 0 | 1 | 2, - subpid :: pid() -}). - --type mqtt_subscriber() :: #mqtt_subscriber{}. - %%------------------------------------------------------------------------------ %% MQTT Client %%------------------------------------------------------------------------------ diff --git a/apps/emqttd/include/emqttd_packet.hrl b/apps/emqttd/include/emqttd_packet.hrl index c8c3c1efb..b173b885e 100644 --- a/apps/emqttd/include/emqttd_packet.hrl +++ b/apps/emqttd/include/emqttd_packet.hrl @@ -45,6 +45,7 @@ -define(QOS_0, 0). -define(QOS_1, 1). -define(QOS_2, 2). +-define(QOS_ERR, 128). -define(IS_QOS(I), (I >= ?QOS_0 andalso I =< ?QOS_2)). diff --git a/apps/emqttd/include/emqttd_topic.hrl b/apps/emqttd/include/emqttd_topic.hrl new file mode 100644 index 000000000..a98dc791a --- /dev/null +++ b/apps/emqttd/include/emqttd_topic.hrl @@ -0,0 +1,48 @@ +%%------------------------------------------------------------------------------ +%% Copyright (c) 2012-2015, Feng Lee +%% +%% Permission is hereby granted, free of charge, to any person obtaining a copy +%% of this software and associated documentation files (the "Software"), to deal +%% in the Software without restriction, including without limitation the rights +%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%% copies of the Software, and to permit persons to whom the Software is +%% furnished to do so, subject to the following conditions: +%% +%% The above copyright notice and this permission notice shall be included in all +%% copies or substantial portions of the Software. +%% +%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%% SOFTWARE. +%%------------------------------------------------------------------------------ +%%% @doc +%%% emqtt topic header. +%%% +%%% @end +%%%----------------------------------------------------------------------------- + +%%------------------------------------------------------------------------------ +%% MQTT Topic +%%------------------------------------------------------------------------------ +-record(topic, { + name :: binary(), + node :: node() +}). + +-type topic() :: #topic{}. + +%%------------------------------------------------------------------------------ +%% MQTT Topic Subscriber +%%------------------------------------------------------------------------------ +-record(topic_subscriber, { + topic :: binary(), + qos = 0 :: 0 | 1 | 2, + subpid :: pid() +}). + +-type topic_subscriber() :: #topic_subscriber{}. + diff --git a/apps/emqttd/src/emqttd_mnesia.erl b/apps/emqttd/src/emqttd_mnesia.erl index bc5bc098e..b0b6807ff 100644 --- a/apps/emqttd/src/emqttd_mnesia.erl +++ b/apps/emqttd/src/emqttd_mnesia.erl @@ -85,27 +85,9 @@ init_tables() -> %%------------------------------------------------------------------------------ create_tables() -> %% trie tree tables - ok = create_table(topic_trie_node, [ - {ram_copies, [node()]}, - {record_name, topic_trie_node}, - {attributes, record_info(fields, topic_trie_node)}]), - ok = create_table(topic_trie, [ - {ram_copies, [node()]}, - {record_name, topic_trie}, - {attributes, record_info(fields, topic_trie)}]), - %% topic table - ok = create_table(topic, [ - {type, bag}, - {ram_copies, [node()]}, - {record_name, topic}, - {attributes, record_info(fields, topic)}]), - %% local subscriber table, not shared with other nodes - ok = create_table(topic_subscriber, [ - {type, bag}, - {ram_copies, [node()]}, - {attributes, record_info(fields, topic_subscriber)}, - {index, [subpid]}, - {local_content, true}]), + %%TODO: should use module 'mnesia_create' attribute... + ok = emqttd_trie:mnesia(create), + ok = emqttd_pubsub:mnesia(create), %% TODO: retained messages, this table should not be copied... ok = create_table(message_retained, [ {type, ordered_set}, diff --git a/apps/emqttd/src/emqttd_pubsub.erl b/apps/emqttd/src/emqttd_pubsub.erl index f29438804..744cf007d 100644 --- a/apps/emqttd/src/emqttd_pubsub.erl +++ b/apps/emqttd/src/emqttd_pubsub.erl @@ -20,7 +20,7 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqttd core pubsub. +%%% emqttd pubsub. %%% %%% @end %%%----------------------------------------------------------------------------- @@ -30,12 +30,20 @@ -include("emqttd.hrl"). +-include("emqttd_topic.hrl"). + -include("emqttd_packet.hrl"). -behaviour(gen_server). -define(SERVER, ?MODULE). +%% Mnesia Callbacks +-export([mnesia/1]). + +-mnesia_create({mnesia, [create]}). +-mnesia_replicate({mnesia, [replicate]}). + %% API Exports -export([start_link/0]). @@ -49,7 +57,30 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {}). +-record(state, {submap :: map()}). + +%%%============================================================================= +%%% Mnesia callbacks +%%%============================================================================= +mnesia(create) -> + %% topic table + ok = emqttd_mnesia:create_table(topic, [ + {type, bag}, + {ram_copies, [node()]}, + {record_name, topic}, + {attributes, record_info(fields, topic)}]), + %% local subscriber table, not shared with other nodes + ok = emqttd_mnesia:create_table(topic_subscriber, [ + {type, bag}, + {ram_copies, [node()]}, + {record_name, topic_subscriber}, + {attributes, record_info(fields, topic_subscriber)}, + {index, [subpid]}, + {local_content, true}]); + +mnesia(replicate) -> + ok = emqttd_mnesia:copy_table(topic), + ok = emqttd_mnesia:copy_table(topic_subscriber). %%%============================================================================= %%% API @@ -84,13 +115,20 @@ create(Topic) when is_binary(Topic) -> -spec subscribe({Topic, Qos} | list({Topic, Qos})) -> {ok, Qos | list(Qos)} when Topic :: binary(), Qos :: mqtt_qos(). -subscribe(Topics = [{Topic, Qos}|_]) when is_binary(Topic) andalso ?IS_QOS(Qos) -> - subscribe2(Topics, []). +subscribe(Topics = [{_Topic, _Qos}|_]) -> + {ok, lists:map(fun({Topic, Qos}) -> + case subscribe(Topic, Qos) of + {ok, GrantedQos} -> + GrantedQos; + Error -> + lager:error("Failed to subscribe '~s': ~p", [Topic, Error]), ?QOS_ERR + end + end, Topics)}. -spec subscribe(Topic :: binary(), Qos :: mqtt_qos()) -> {ok, Qos :: mqtt_qos()}. subscribe(Topic, Qos) when is_binary(Topic) andalso ?IS_QOS(Qos) -> TopicRecord = emqttd_topic:new(Topic), - Subscriber = #mqtt_subscriber{topic = Topic, qos = Qos, subpid = self()}, + Subscriber = #topic_subscriber{topic = Topic, qos = Qos, subpid = self()}, F = fun() -> case insert_topic(TopicRecord) of ok -> insert_subscriber(Subscriber); @@ -99,20 +137,9 @@ subscribe(Topic, Qos) when is_binary(Topic) andalso ?IS_QOS(Qos) -> end, case mnesia:transaction(F) of {atomic, ok} -> {ok, Qos}; - Error -> Error + {aborted, Reason} -> {error, Reason} end. -subscribe2([], QosAcc) -> - {ok, lists:reverse(QosAcc)}; -subscribe2([{Topic, Qos}|Topics], Acc) -> - case subscribe(Topic, Qos) of - {ok, GrantedQos} -> - subscribe2(Topics, [GrantedQos|Acc]); - Error -> - Error - end. - - %%------------------------------------------------------------------------------ %% @doc %% Unsubscribe Topic or Topics @@ -121,24 +148,17 @@ subscribe2([{Topic, Qos}|Topics], Acc) -> %%------------------------------------------------------------------------------ -spec unsubscribe(binary() | list(binary())) -> ok. unsubscribe(Topic) when is_binary(Topic) -> - unsubscribe([Topic]); + SubPid = self(), + TopicRecord = emqttd_topic:new(Topic), + F = fun() -> + Pattern = #topic_subscriber{topic = Topic, _ = '_', subpid = SubPid}, + [mnesia:delete_object(Sub) || Sub <- mnesia:match_object(Pattern)], + try_remove_topic(TopicRecord) + end, + {atomic, _} = mneisa:transaction(F), ok; -unsubscribe(Topics = [Topic|_]) when is_list(Topics) and is_binary(Topic) -> - unsubscribe(Topics, self()). - -unsubscribe(Topics, SubPid) -> - F = fun() -> - Subscribers = mnesia:index_read(topic_subscriber, SubPid, #topic_subscriber.subpid), - lists:foreach(fun(Sub = #topic_subscriber{topic = Topic}) -> - case lists:member(Topic, Topics) of - true -> mneisa:delete_object(Sub); - false -> ok - end - end, Subscribers) - %TODO: try to remove topic??? if topic is dynamic... - %%try_remove_topic(Topic) - end, - {atomic, _} = mneisa:transaction(F), ok. +unsubscribe(Topics = [Topic|_]) when is_binary(Topic) -> + lists:foreach(fun(T) -> unsubscribe(T) end, Topics). %%------------------------------------------------------------------------------ %% @doc @@ -152,14 +172,12 @@ publish(Msg=#mqtt_message{topic=Topic}) -> -spec publish(Topic :: binary(), Msg :: mqtt_message()) -> any(). publish(Topic, Msg) when is_binary(Topic) -> - Count = - lists:foldl(fun(#topic{name=Name, node=Node}, Acc) -> + lists:foreach(fun(#topic{name=Name, node=Node}) -> case Node =:= node() of - true -> dispatch(Name, Msg) + Acc; - false -> rpc:call(Node, ?MODULE, dispatch, [Name, Msg]) + Acc + true -> dispatch(Name, Msg); + false -> rpc:cast(Node, ?MODULE, dispatch, [Name, Msg]) end - end, 0, match(Topic)), - dropped(Count =:= 0). + end, match(Topic)). %%------------------------------------------------------------------------------ %% @doc @@ -169,16 +187,20 @@ publish(Topic, Msg) when is_binary(Topic) -> %%------------------------------------------------------------------------------ -spec dispatch(Topic :: binary(), Msg :: mqtt_message()) -> non_neg_integer(). dispatch(Topic, Msg = #mqtt_message{qos = Qos}) when is_binary(Topic) -> - Subscribers = mnesia:dirty_read(topic_subscriber, Topic), - lists:foreach( - fun(#topic_subscriber{qos = SubQos, subpid=SubPid}) -> - Msg1 = if - Qos > SubQos -> Msg#mqtt_message{qos = SubQos}; - true -> Msg - end, - SubPid ! {dispatch, {self(), Msg1}} - end, Subscribers), - length(Subscribers). + case mnesia:dirty_read(topic_subscriber, Topic) of + [] -> + %%TODO: not right when clusted... + setstats(dropped); + Subscribers -> + lists:foreach( + fun(#topic_subscriber{qos = SubQos, subpid=SubPid}) -> + Msg1 = if + Qos > SubQos -> Msg#mqtt_message{qos = SubQos}; + true -> Msg + end, + SubPid ! {dispatch, {self(), Msg1}} + end, Subscribers) + end. %%------------------------------------------------------------------------------ %% @doc @@ -189,60 +211,78 @@ dispatch(Topic, Msg = #mqtt_message{qos = Qos}) when is_binary(Topic) -> %%------------------------------------------------------------------------------ -spec match(Topic :: binary()) -> [topic()]. match(Topic) when is_binary(Topic) -> - TrieNodes = mnesia:async_dirty(fun trie_match/1, [emqttd_topic:words(Topic)]), - Names = [Name || #topic_trie_node{topic=Name} <- TrieNodes, Name=/= undefined], - lists:flatten([mnesia:dirty_read(topic, Name) || Name <- Names]). + MatchedTopics = mnesia:async_dirty(fun emqttd_trie:find/1, [Topic]), + lists:flatten([mnesia:dirty_read(topic, Name) || Name <- MatchedTopics]). -%% ------------------------------------------------------------------ -%% gen_server Function Definitions -%% ------------------------------------------------------------------ +%%%============================================================================= +%%% gen_server callbacks +%%%============================================================================= init([]) -> + %%TODO: really need? + process_flag(priority, high), + process_flag(min_heap_size, 1024*1024), mnesia:subscribe({table, topic, simple}), - %% trie and topic tables, will be copied by all nodes. mnesia:subscribe({table, topic_subscriber, simple}), - {ok, #state{}}. + {ok, #state{submap = maps:new()}}. handle_call(Req, _From, State) -> - lager:error("Bad Req: ~p", [Req]), - {reply, error, State}. + lager:error("Bad Request: ~p", [Req]), + {reply, {error, badreq}, State}. handle_cast(Msg, State) -> lager:error("Bad Msg: ~p", [Msg]), {noreply, State}. -%% a new record has been written. -handle_info({mnesia_table_event, {write, #topic_subscriber{subpid = Pid}, _ActivityId}}, State) -> - %%TODO: rewrite... - erlang:monitor(process, Pid), - upstats(subscriber), +handle_info({mnesia_table_event, {write, #topic_subscriber{subpid = Pid}, _ActivityId}}, + State = #state{submap = SubMap}) -> + case maps:is_key(Pid, SubMap) of + false -> + maps:put(Pid, erlang:monitor(process, Pid)); + true -> + ignore + end, + setstats(subscribers), {noreply, State}; -%% TODO:... handle_info({mnesia_table_event, {write, #topic{}, _ActivityId}}, State) -> - upstats(topic), + %%TODO: this is not right when clusterd. + setstats(topics), {noreply, State}; %% {write, #topic{}, _ActivityId} %% {delete_object, _OldRecord, _ActivityId} %% {delete, {Tab, Key}, ActivityId} handle_info({mnesia_table_event, _Event}, State) -> - upstats(), + setstats(topics), + setstats(subscribers), {noreply, State}; -handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State) -> - F = fun() -> - %%TODO: how to read with write lock? - [mnesia:delete_object(Sub) || Sub <- mnesia:index_read(topic_subscriber, DownPid, #topic_subscriber.subpid)] - %%TODO: try to remove dynamic topics without subscribers - %% [try_remove_topic(Topic) || #topic_subscriber{topic=Topic} <- Subs] - end, - case catch mnesia:transaction(F) of - {atomic, _} -> ok; - {aborted, Reason} -> lager:error("Failed to delete 'DOWN' subscriber ~p: ~p", [DownPid, Reason]) - end, - upstats(), - {noreply, State}; +handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State = #state{submap = SubMap}) -> + case maps:is_key(DownPid, SubMap) of + true -> + Node = node(), + F = fun() -> + Subscribers = mnesia:index_read(topic_subscriber, DownPid, #topic_subscriber.subpid), + lists:foreach(fun(Sub = #topic_subscriber{topic = Topic}) -> + mnesia:delete_object(Sub), + try_remove_topic(#topic{name = Topic, node = Node}) + end, Subscribers) + end, + NewState = + case catch mnesia:transaction(F) of + {atomic, _} -> + State#state{submap = maps:remove(DownPid, SubMap)}; + {aborted, Reason} -> + lager:error("Failed to delete 'DOWN' subscriber ~p: ~p", [DownPid, Reason]), + State + end, + setstats(topics), setstats(subscribers), + {noreply, NewState}; + false -> + lager:error("Unexpected 'DOWN' from ~p", [DownPid]), + {noreply, State} + end; handle_info(Info, State) -> lager:error("Unexpected Info: ~p", [Info]), @@ -260,27 +300,10 @@ code_change(_OldVsn, State, _Extra) -> %%%============================================================================= %%% Internal functions %%%============================================================================= - -%%try_remove_topic(Name) when is_binary(Name) -> -%% case ets:member(topic_subscriber, Name) of -%% false -> -%% Topic = emqttd_topic:new(Name), -%% Fun = fun() -> -%% mnesia:delete_object(Topic), -%% case mnesia:read(topic, Name) of -%% [] -> trie_delete(Name); -%% _ -> ignore -%% end -%% end, -%% mnesia:transaction(Fun); -%% true -> -%% ok -%% end. -%% -insert_topic(Topic = #mqtt_topic{name = Name, node = Node}) -> +insert_topic(Topic = #topic{name = Name}) -> case mnesia:wread(topic, Name) of [] -> - trie_add(Name), + ok = emqttd_trie:insert(Name), mnesia:write(Topic); Topics -> case lists:member(Topic, Topics) of @@ -289,20 +312,30 @@ insert_topic(Topic = #mqtt_topic{name = Name, node = Node}) -> end end. +insert_subscriber(Subscriber) -> + mnesia:write(Subscriber). -upstats() -> - upstats(topic), upstats(subscribe). +try_remove_topic(Topic = #topic{name = Name}) -> + %%TODO: is this ok in transaction? + case ets:member(topic_subscriber, Name) of + false -> + mnesia:delete_object(Topic), + case mnesia:read(topic, Name) of + [] -> emqttd_trie:delete(Name); + _ -> ok + end; + true -> + ok + end. -upstats(topic) -> +setstats(topics) -> emqttd_broker:setstat('topics/count', mnesia:table_info(topic, size)); -upstats(subscribe) -> +setstats(subscribers) -> emqttd_broker:setstats('subscribers/count', 'subscribers/max', - mnesia:table_info(topic_subscriber, size)). + mnesia:table_info(topic_subscriber, size)); +setstats(dropped) -> + emqttd_metrics:inc('messages/dropped'). -dropped(true) -> - emqttd_metrics:inc('messages/dropped'); -dropped(false) -> - ok. diff --git a/apps/emqttd/src/emqttd_topic.erl b/apps/emqttd/src/emqttd_topic.erl index f7584d901..33b541c0d 100644 --- a/apps/emqttd/src/emqttd_topic.erl +++ b/apps/emqttd/src/emqttd_topic.erl @@ -28,7 +28,7 @@ -author('feng@emqtt.io'). --include("emqttd.hrl"). +-include("emqttd_topic.hrl"). -import(lists, [reverse/1]). @@ -52,9 +52,9 @@ %% %% @end %%%----------------------------------------------------------------------------- --spec new(binary()) -> mqtt_topic(). +-spec new(binary()) -> topic(). new(Name) when is_binary(Name) -> - #mqtt_topic{name = Name, node = node()}. + #topic{name = Name, node = node()}. %%%----------------------------------------------------------------------------- %% @doc @@ -62,8 +62,8 @@ new(Name) when is_binary(Name) -> %% %% @end %%%----------------------------------------------------------------------------- --spec wildcard(mqtt_topic() | binary()) -> true | false. -wildcard(#mqtt_topic{name = Name}) when is_binary(Name) -> +-spec wildcard(topic() | binary()) -> true | false. +wildcard(#topic{name = Name}) when is_binary(Name) -> wildcard(Name); wildcard(Topic) when is_binary(Topic) -> wildcard(words(Topic)); diff --git a/apps/emqttd/src/emqtt_trie.erl b/apps/emqttd/src/emqttd_trie.erl similarity index 96% rename from apps/emqttd/src/emqtt_trie.erl rename to apps/emqttd/src/emqttd_trie.erl index e8e326ab4..f86768048 100644 --- a/apps/emqttd/src/emqtt_trie.erl +++ b/apps/emqttd/src/emqttd_trie.erl @@ -99,6 +99,7 @@ mnesia(replicate) -> %% %% @end %%------------------------------------------------------------------------------ +-spec insert(Topic :: binary()) -> ok. insert(Topic) when is_binary(Topic) -> case mnesia:read(trie_node, Topic) of [#trie_node{topic=Topic}] -> @@ -118,8 +119,10 @@ insert(Topic) when is_binary(Topic) -> %% %% @end %%------------------------------------------------------------------------------ +-spec find(Topic :: binary()) -> list(MatchedTopic :: binary()). find(Topic) when is_binary(Topic) -> - match_node(root, emqttd_topic:words(Topic), []). + TrieNodes = match_node(root, emqttd_topic:words(Topic), []), + [Name || #trie_node{topic=Name} <- TrieNodes, Name=/= undefined]. %%------------------------------------------------------------------------------ %% @doc @@ -127,6 +130,7 @@ find(Topic) when is_binary(Topic) -> %% %% @end %%------------------------------------------------------------------------------ +-spec delete(Topic :: binary()) -> ok. delete(Topic) when is_binary(Topic) -> case mnesia:read(trie_node, Topic) of [#trie_node{edge_count=0}] -> @@ -135,7 +139,7 @@ delete(Topic) when is_binary(Topic) -> [TrieNode] -> mnesia:write(TrieNode#trie_node{topic=Topic}); [] -> - ignore + ok end. %%%=============================================================================