From 21d456fd1fff6cc66170dac1af287d1c106560ac Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 22 Mar 2015 22:35:50 +0800 Subject: [PATCH] rewrite pubsub --- apps/emqttd/src/emqttd_bridge.erl | 2 +- apps/emqttd/src/emqttd_broker.erl | 4 +- apps/emqttd/src/emqttd_metrics.erl | 2 +- apps/emqttd/src/emqttd_pubsub.erl | 130 ++++++++++++++++------------- apps/emqttd/src/emqttd_session.erl | 4 +- 5 files changed, 78 insertions(+), 64 deletions(-) diff --git a/apps/emqttd/src/emqttd_bridge.erl b/apps/emqttd/src/emqttd_bridge.erl index d80979516..765f988e1 100644 --- a/apps/emqttd/src/emqttd_bridge.erl +++ b/apps/emqttd/src/emqttd_bridge.erl @@ -81,7 +81,7 @@ init([Node, SubTopic, Options]) -> true -> true = erlang:monitor_node(Node, true), State = parse_opts(Options, #state{node = Node, subtopic = SubTopic}), - emqttd_pubsub:subscribe({SubTopic, ?QOS_0}, self()), + emqttd_pubsub:subscribe({SubTopic, ?QOS_0}), {ok, State}; false -> {stop, {cannot_connect, Node}} diff --git a/apps/emqttd/src/emqttd_broker.erl b/apps/emqttd/src/emqttd_broker.erl index ea73efa3e..b5f23d156 100644 --- a/apps/emqttd/src/emqttd_broker.erl +++ b/apps/emqttd/src/emqttd_broker.erl @@ -150,8 +150,8 @@ init([Options]) -> Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB, [ets:insert(?BROKER_TAB, {Topic, 0}) || Topic <- Topics], % Create $SYS Topics - [{atomic, _} = create(systop(Topic)) || Topic <- ?SYSTOP_BROKERS], - [{atomic, _} = create(systop(Topic)) || Topic <- Topics], + [ok = create(systop(Topic)) || Topic <- ?SYSTOP_BROKERS], + [ok = create(systop(Topic)) || Topic <- Topics], SysInterval = proplists:get_value(sys_interval, Options, 60), State = #state{started_at = os:timestamp(), sys_interval = SysInterval}, Delay = if diff --git a/apps/emqttd/src/emqttd_metrics.erl b/apps/emqttd/src/emqttd_metrics.erl index ad1b4b85d..3858ec441 100644 --- a/apps/emqttd/src/emqttd_metrics.erl +++ b/apps/emqttd/src/emqttd_metrics.erl @@ -182,7 +182,7 @@ init([Options]) -> % Init metrics [new_metric(Metric) || Metric <- Metrics], % $SYS Topics for metrics - [{atomic, _} = emqttd_pubsub:create(systop(Topic)) || {_, Topic} <- Metrics], + [ok = emqttd_pubsub:create(systop(Topic)) || {_, Topic} <- Metrics], PubInterval = proplists:get_value(pub_interval, Options, 60), Delay = if PubInterval == 0 -> 0; diff --git a/apps/emqttd/src/emqttd_pubsub.erl b/apps/emqttd/src/emqttd_pubsub.erl index 6521b85eb..e77c181e0 100644 --- a/apps/emqttd/src/emqttd_pubsub.erl +++ b/apps/emqttd/src/emqttd_pubsub.erl @@ -109,7 +109,7 @@ topics() -> %%------------------------------------------------------------------------------ -spec create(binary()) -> {atomic, Reason :: any()} | {aborted, Reason :: any()}. create(Topic) when is_binary(Topic) -> - mnesia:transaction(fun trie_add/1, [Topic]). + {atomic, ok} = mnesia:transaction(fun trie_add/1, [Topic]), ok. %%------------------------------------------------------------------------------ %% @doc @@ -128,21 +128,13 @@ subscribe(Topics = [{_Topic, _Qos}|_]) -> subscribe([], _SubPid, Acc) -> {ok, lists:reverse(Acc)}; +%%TODO: check this function later. subscribe([{Topic, Qos}|Topics], SubPid, Acc) -> Subscriber = #topic_subscriber{topic=Topic, qos = Qos, subpid=SubPid}, - F = fun() -> - case trie_add(Topic) of - ok -> - mnesia:write(Subscriber); - {atomic, already_exist} -> - mnesia:write(Subscriber); - {aborted, Reason} -> - {aborted, Reason} - end - end, + F = fun() -> trie_add(Topic), mnesia:write(Subscriber) end, case mnesia:transaction(F) of - ok -> subscribe(Topics, SubPid, [Qos|Acc]); - {aborted, Reason} -> {error, {aborted, Reason}} + {atomic, ok} -> subscribe(Topics, SubPid, [Qos|Acc]); + Error -> {error, Error} end. %%------------------------------------------------------------------------------ @@ -153,14 +145,25 @@ subscribe([{Topic, Qos}|Topics], SubPid, Acc) -> %%------------------------------------------------------------------------------ -spec unsubscribe(binary() | list(binary())) -> ok. unsubscribe(Topic) when is_binary(Topic) -> - %% call mnesia directly unsubscribe([Topic]); -unsubscribe(Topics = [Topics|_]) when is_list(Topics) and is_binary(Topic) -> +unsubscribe(Topics = [Topic|_]) when is_list(Topics) and is_binary(Topic) -> unsubscribe(Topics, self()). +%%TODO: check this function later. unsubscribe(Topics, SubPid) -> - + F = fun() -> + Subscribers = mnesia:index_read(topic_subscriber, SubPid, #topic_subscriber.subpid), + lists:foreach(fun(Sub = #topic_subscriber{topic = Topic}) -> + case lists:member(Topic, Topics) of + true -> mneisa:delete_object(Sub); + false -> ok + end + end, Subscribers) + %TODO: try to remove topic??? if topic is dynamic... + %%try_remove_topic(Topic) + end, + {atomic, _} = mneisa:transaction(F), ok. %%------------------------------------------------------------------------------ %% @doc @@ -191,7 +194,7 @@ publish(Topic, Msg) when is_binary(Topic) -> %%------------------------------------------------------------------------------ -spec dispatch(Topic :: binary(), Msg :: mqtt_message()) -> non_neg_integer(). dispatch(Topic, Msg = #mqtt_message{qos = Qos}) when is_binary(Topic) -> - Subscribers = ets:lookup(topic_subscriber, Topic), + Subscribers = mnesia:dirty_read(topic_subscriber, Topic), lists:foreach( fun(#topic_subscriber{qos = SubQos, subpid=SubPid}) -> Msg1 = if @@ -235,15 +238,16 @@ init([]) -> {ram_copies, [node()]}, {attributes, record_info(fields, topic)}]), mnesia:add_table_copy(topic, node(), ram_copies), - + mnesia:subscribe({table, topic, simple}), %% local table, not shared with other table mnesia:create_table(topic_subscriber, [ {type, bag}, {record_name, topic_subscriber}, - {ram_copies, [node()]}, + {ram_copies, [node()]}, {attributes, record_info(fields, topic_subscriber)}, - {index, [subpid]}, + {index, [subpid]}, {local_content, true}]), + mnesia:subscribe({table, topic_subscriber, simple}), {ok, #state{}}. handle_call(getstats, _From, State = #state{max_subs = Max}) -> @@ -253,35 +257,45 @@ handle_call(getstats, _From, State = #state{max_subs = Max}) -> {reply, Stats, State}; handle_call(Req, _From, State) -> - {stop, {badreq, Req}, State}. - -handle_cast({unsubscribe, Topics, SubPid}, State) -> - lists:foreach(fun(Topic) -> - ets:match_delete(topic_subscriber, #topic_subscriber{topic=Topic, qos ='_', subpid=SubPid}), - try_remove_topic(Topic) - end, Topics), - {noreply, setstats(State)}; + lager:error("Bad Req: ~p", [Req]), + {reply, error, State}. handle_cast(Msg, State) -> - {stop, {badmsg, Msg}, State}. + lager:error("Bad Msg: ~p", [Msg]), + {noreply, State}. -handle_info({'DOWN', Mon, _Type, _Object, _Info}, State) -> - case get({submon, Mon}) of - undefined -> - lager:error("unexpected 'DOWN': ~p", [Mon]); - SubPid -> - erase({submon, Mon}), - erase({subscriber, SubPid}), - Subs = ets:match_object(topic_subscriber, #topic_subscriber{subpid=SubPid, _='_'}), - [ets:delete_object(topic_subscriber, Sub) || Sub <- Subs], - [try_remove_topic(Topic) || #topic_subscriber{topic=Topic} <- Subs] - end, +%% a new record has been written. +handle_info({mnesia_table_event, {write, #topic_subscriber{subpid = Pid}, _ActivityId}}, State) -> + erlang:monitor(process, Pid), + {noreply, setstats(State)}; + +%% {write, #topic{}, _ActivityId} +%% {delete_object, _OldRecord, _ActivityId} +%% {delete, {Tab, Key}, ActivityId} +handle_info({mnesia_table_event, _Event}, State) -> + {noreply, setstats(State)}; + +handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State) -> + F = fun() -> + %%TODO: how to read with write lock? + [mnesia:delete_object(Sub) || Sub <- mnesia:index_read(topic_subscriber, DownPid, #topic_subscriber.subpid)] + %%TODO: try to remove dynamic topics without subscribers + %% [try_remove_topic(Topic) || #topic_subscriber{topic=Topic} <- Subs] + end, + case catch mnesia:transaction(F) of + {atomic, _} -> ok; + {aborted, Reason} -> lager:error("Failed to delete 'DOWN' subscriber ~p: ~p", [DownPid, Reason]) + end, {noreply, setstats(State)}; handle_info(Info, State) -> - {stop, {badinfo, Info}, State}. + lager:error("Bad Info: ~p", [Info]), + {noreply, State}. terminate(_Reason, _State) -> + mnesia:unsubscribe({table, topic, simple}), + mnesia:unsubscribe({table, topic_subscriber, simple}), + %%TODO: clear topics belongs to this node??? ok. code_change(_OldVsn, State, _Extra) -> @@ -291,21 +305,21 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= -try_remove_topic(Name) when is_binary(Name) -> - case ets:member(topic_subscriber, Name) of - false -> - Topic = emqttd_topic:new(Name), - Fun = fun() -> - mnesia:delete_object(Topic), - case mnesia:read(topic, Name) of - [] -> trie_delete(Name); - _ -> ignore - end - end, - mnesia:transaction(Fun); - true -> - ok - end. +%%try_remove_topic(Name) when is_binary(Name) -> +%% case ets:member(topic_subscriber, Name) of +%% false -> +%% Topic = emqttd_topic:new(Name), +%% Fun = fun() -> +%% mnesia:delete_object(Topic), +%% case mnesia:read(topic, Name) of +%% [] -> trie_delete(Name); +%% _ -> ignore +%% end +%% end, +%% mnesia:transaction(Fun); +%% true -> +%% ok +%% end. trie_add(Topic) when is_binary(Topic) -> mnesia:write(emqttd_topic:new(Topic)), @@ -313,7 +327,7 @@ trie_add(Topic) when is_binary(Topic) -> [TrieNode=#topic_trie_node{topic=undefined}] -> mnesia:write(TrieNode#topic_trie_node{topic=Topic}); [#topic_trie_node{topic=Topic}] -> - {atomic, already_exist}; + ok; [] -> %add trie path [trie_add_path(Triple) || Triple <- emqttd_topic:triples(Topic)], @@ -389,7 +403,7 @@ trie_delete_path([{NodeId, Word, _} | RestPath]) -> setstats(State = #state{max_subs = Max}) -> emqttd_broker:setstat('topics/count', mnesia:table_info(topic, size)), - SubCount = ets:info(topic_subscriber, size), + SubCount = mnesia:table_info(topic_subscriber, size), emqttd_broker:setstat('subscribers/count', SubCount), if SubCount > Max -> diff --git a/apps/emqttd/src/emqttd_session.erl b/apps/emqttd/src/emqttd_session.erl index 9754f8bd7..bf9831135 100644 --- a/apps/emqttd/src/emqttd_session.erl +++ b/apps/emqttd/src/emqttd_session.erl @@ -185,7 +185,7 @@ subscribe(SessState = #session_state{client_id = ClientId, submap = SubMap}, Top _ -> lager:warning("~s resubscribe ~p", [ClientId, Resubs]) end, SubMap1 = lists:foldl(fun({Name, Qos}, Acc) -> maps:put(Name, Qos, Acc) end, SubMap, Topics), - {ok, GrantedQos} = emqttd_pubsub:subscribe(Topics, self()), + {ok, GrantedQos} = emqttd_pubsub:subscribe(Topics), %%TODO: should be gen_event and notification... emqttd_server:subscribe([ Name || {Name, _} <- Topics ], self()), {ok, SessState#session_state{submap = SubMap1}, GrantedQos}; @@ -208,7 +208,7 @@ unsubscribe(SessState = #session_state{client_id = ClientId, submap = SubMap}, T BadUnsubs -> lager:warning("~s should not unsubscribe ~p", [ClientId, BadUnsubs]) end, %%unsubscribe from topic tree - ok = emqttd_pubsub:unsubscribe(Topics, self()), + ok = emqttd_pubsub:unsubscribe(Topics), SubMap1 = lists:foldl(fun(Topic, Acc) -> maps:remove(Topic, Acc) end, SubMap, Topics), {ok, SessState#session_state{submap = SubMap1}};