diff --git a/apps/emqttd/src/emqttd_pubsub.erl b/apps/emqttd/src/emqttd_pubsub.erl index ebdd8fe4c..6521b85eb 100644 --- a/apps/emqttd/src/emqttd_pubsub.erl +++ b/apps/emqttd/src/emqttd_pubsub.erl @@ -48,8 +48,8 @@ -export([topics/0, create/1, - subscribe/2, - unsubscribe/2, + subscribe/1, + unsubscribe/1, publish/1, publish/2, %local node @@ -108,8 +108,8 @@ topics() -> %% @end %%------------------------------------------------------------------------------ -spec create(binary()) -> {atomic, Reason :: any()} | {aborted, Reason :: any()}. -create(Topic) -> - gen_server:call(?SERVER, {create, Topic}). +create(Topic) when is_binary(Topic) -> + mnesia:transaction(fun trie_add/1, [Topic]). %%------------------------------------------------------------------------------ %% @doc @@ -117,16 +117,33 @@ create(Topic) -> %% %% @end %%------------------------------------------------------------------------------ --spec subscribe({binary(), mqtt_qos()} | list(), pid()) -> {ok, list(mqtt_qos())}. -subscribe({Topic, Qos}, SubPid) when is_binary(Topic) and is_pid(SubPid) -> - subscribe([{Topic, Qos}], SubPid); +-spec subscribe({binary(), mqtt_qos()} | list()) -> {ok, list(mqtt_qos())}. +subscribe({Topic, Qos}) when is_binary(Topic) -> + case subscribe([{Topic, Qos}]) of + {ok, [GrantedQos]} -> {ok, GrantedQos}; + {error, Error} -> {error, Error} + end; +subscribe(Topics = [{_Topic, _Qos}|_]) -> + subscribe(Topics, self(), []). -%% TODO: -%% call will not work when there are 2000K+ clients, 100+ sub requests/sec... -%% will optimize in 0.9.x... -%% -subscribe(Topics, SubPid) when is_list(Topics) and is_pid(SubPid) -> - gen_server:call(?SERVER, {subscribe, Topics, SubPid}, infinity). +subscribe([], _SubPid, Acc) -> + {ok, lists:reverse(Acc)}; +subscribe([{Topic, Qos}|Topics], SubPid, Acc) -> + Subscriber = #topic_subscriber{topic=Topic, qos = Qos, subpid=SubPid}, + F = fun() -> + case trie_add(Topic) of + ok -> + mnesia:write(Subscriber); + {atomic, already_exist} -> + mnesia:write(Subscriber); + {aborted, Reason} -> + {aborted, Reason} + end + end, + case mnesia:transaction(F) of + ok -> subscribe(Topics, SubPid, [Qos|Acc]); + {aborted, Reason} -> {error, {aborted, Reason}} + end. %%------------------------------------------------------------------------------ %% @doc @@ -134,12 +151,16 @@ subscribe(Topics, SubPid) when is_list(Topics) and is_pid(SubPid) -> %% %% @end %%------------------------------------------------------------------------------ --spec unsubscribe(binary() | list(binary()), pid()) -> ok. -unsubscribe(Topic, SubPid) when is_binary(Topic) and is_pid(SubPid) -> - unsubscribe([Topic], SubPid); +-spec unsubscribe(binary() | list(binary())) -> ok. +unsubscribe(Topic) when is_binary(Topic) -> + %% call mnesia directly + unsubscribe([Topic]); + +unsubscribe(Topics = [Topics|_]) when is_list(Topics) and is_binary(Topic) -> + unsubscribe(Topics, self()). + +unsubscribe(Topics, SubPid) -> -unsubscribe(Topics, SubPid) when is_list(Topics) and is_pid(SubPid) -> - gen_server:cast(?SERVER, {unsubscribe, Topics, SubPid}). %%------------------------------------------------------------------------------ %% @doc @@ -199,42 +220,38 @@ match(Topic) when is_binary(Topic) -> %% ------------------------------------------------------------------ init([]) -> - mnesia:create_table(topic_trie, [ - {ram_copies, [node()]}, - {attributes, record_info(fields, topic_trie)}]), + %% trie and topic tables, will be copied by all nodes. mnesia:create_table(topic_trie_node, [ {ram_copies, [node()]}, {attributes, record_info(fields, topic_trie_node)}]), + mnesia:add_table_copy(topic_trie_node, node(), ram_copies), + mnesia:create_table(topic_trie, [ + {ram_copies, [node()]}, + {attributes, record_info(fields, topic_trie)}]), + mnesia:add_table_copy(topic_trie, node(), ram_copies), mnesia:create_table(topic, [ {type, bag}, {record_name, topic}, {ram_copies, [node()]}, {attributes, record_info(fields, topic)}]), - mnesia:add_table_copy(topic_trie, node(), ram_copies), - mnesia:add_table_copy(topic_trie_node, node(), ram_copies), mnesia:add_table_copy(topic, node(), ram_copies), - ets:new(topic_subscriber, [bag, named_table, {keypos, 2}]), + + %% local table, not shared with other table + mnesia:create_table(topic_subscriber, [ + {type, bag}, + {record_name, topic_subscriber}, + {ram_copies, [node()]}, + {attributes, record_info(fields, topic_subscriber)}, + {index, [subpid]}, + {local_content, true}]), {ok, #state{}}. handle_call(getstats, _From, State = #state{max_subs = Max}) -> Stats = [{'topics/count', mnesia:table_info(topic, size)}, - {'subscribers/count', ets:info(topic_subscriber, size)}, + {'subscribers/count', mnesia:info(topic_subscriber, size)}, {'subscribers/max', Max}], {reply, Stats, State}; -handle_call({create, Topic}, _From, State) -> - Result = mnesia:transaction(fun trie_add/1, [Topic]), - {reply, Result, setstats(State)}; - -handle_call({subscribe, Topics, SubPid}, _From, State) -> - Result = [subscribe_topic({Topic, Qos}, SubPid) || {Topic, Qos} <- Topics], - Reply = - case [Err || Err = {error, _} <- Result] of - [] -> {ok, [Qos || {ok, Qos} <- Result]}; - Errors -> hd(Errors) - end, - {reply, Reply, setstats(State)}; - handle_call(Req, _From, State) -> {stop, {badreq, Req}, State}. @@ -273,41 +290,6 @@ code_change(_OldVsn, State, _Extra) -> %%%============================================================================= %%% Internal functions %%%============================================================================= -subscribe_topic({Topic, Qos}, SubPid) -> - case mnesia:transaction(fun trie_add/1, [Topic]) of - {atomic, _} -> - case get({subscriber, SubPid}) of - undefined -> - %%TODO: refactor later... - MonRef = erlang:monitor(process, SubPid), - put({subcriber, SubPid}, MonRef), - put({submon, MonRef}, SubPid); - _ -> - already_monitored - end, - %% remove duplicated subscribers - try_remove_subscriber({Topic, Qos}, SubPid), - ets:insert(topic_subscriber, #topic_subscriber{topic=Topic, qos = Qos, subpid=SubPid}), - %TODO: GrantedQos?? - {ok, Qos}; - {aborted, Reason} -> - {error, Reason} - end. - -try_remove_subscriber({Topic, Qos}, SubPid) -> - case ets:lookup(topic_subscriber, Topic) of - [] -> - not_found; - Subs -> - DupSubs = [Sub || Sub = #topic_subscriber{qos = OldQos, subpid = OldPid} - <- Subs, Qos =/= OldQos, OldPid =:= SubPid], - case DupSubs of - [] -> ok; - [DupSub] -> - lager:warning("PubSub: remove duplicated subscriber ~p", [DupSub]), - ets:delete(topic_subscriber, DupSub) - end - end. try_remove_topic(Name) when is_binary(Name) -> case ets:member(topic_subscriber, Name) of