From 45b63a6b13a41915c70d309d4d0afe2a8a0d8023 Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Thu, 15 Jan 2015 00:03:10 +0800 Subject: [PATCH] fix issue#39 remove old subscription with different Qos #39 --- apps/emqtt/src/emqtt_pubsub.erl | 101 +++++++++++++++++++++++--------- 1 file changed, 73 insertions(+), 28 deletions(-) diff --git a/apps/emqtt/src/emqtt_pubsub.erl b/apps/emqtt/src/emqtt_pubsub.erl index a3c73a9bb..51274128d 100644 --- a/apps/emqtt/src/emqtt_pubsub.erl +++ b/apps/emqtt/src/emqtt_pubsub.erl @@ -60,6 +60,20 @@ terminate/2, code_change/3]). +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec topics() -> list(topic()). + +-spec subscribe({binary(), mqtt_qos()} | list(), pid()) -> {ok, list(mqtt_qos())}. + +-spec unsubscribe(binary() | list(binary()), pid()) -> ok. + +-endif. + +%%---------------------------------------------------------------------------- + -record(state, {}). %% ------------------------------------------------------------------ @@ -75,25 +89,26 @@ start_link() -> %% %% @doc All topics %% --spec topics() -> list(topic()). topics() -> mnesia:dirty_all_keys(topic). %% -%% @doc Subscribe Topic +%% @doc Subscribe Topic or Topics %% --spec subscribe({Topic :: binary(), Qos :: mqtt_qos()}, SubPid :: pid()) -> any(). subscribe({Topic, Qos}, SubPid) when is_binary(Topic) and is_pid(SubPid) -> - gen_server:call(?SERVER, {subscribe, {Topic, Qos}, SubPid}). - + subscribe([{Topic, Qos}], SubPid); +subscribe(Topics, SubPid) when is_list(Topics) and is_pid(SubPid) -> + gen_server:call(?SERVER, {subscribe, Topics, SubPid}). %% -%% @doc Unsubscribe Topic +%% @doc Unsubscribe Topic or Topics %% --spec unsubscribe(Topic :: binary(), SubPid :: pid()) -> any(). unsubscribe(Topic, SubPid) when is_binary(Topic) and is_pid(SubPid) -> - gen_server:cast(?SERVER, {unsubscribe, Topic, SubPid}). + unsubscribe([Topic], SubPid); + +unsubscribe(Topics, SubPid) when is_list(Topics) and is_pid(SubPid) -> + gen_server:cast(?SERVER, {unsubscribe, Topics, SubPid}). %% %% @doc Publish to cluster node. @@ -143,29 +158,23 @@ init([]) -> ets:new(topic_subscriber, [bag, named_table, {keypos, 2}]), {ok, #state{}}. -handle_call({subscribe, {Topic, Qos}, SubPid}, _From, State) -> - case mnesia:transaction(fun trie_add/1, [Topic]) of - {atomic, _} -> - case get({subscriber, SubPid}) of - undefined -> - MonRef = erlang:monitor(process, SubPid), - put({subcriber, SubPid}, MonRef), - put({submon, MonRef}, SubPid); - _ -> - already_monitored - end, - ets:insert(topic_subscriber, #topic_subscriber{topic=Topic, qos = Qos, subpid=SubPid}), - {reply, ok, State}; - {aborted, Reason} -> - {reply, {error, Reason}, State} - end; +handle_call({subscribe, Topics, SubPid}, _From, State) -> + Result = [subscribe_topic({Topic, Qos}, SubPid) || {Topic, Qos} <- Topics], + Reply = + case [Err || Err = {error, _} <- Result] of + [] -> {ok, [Qos || {ok, Qos} <- Result]}; + Errors -> hd(Errors) + end, + {reply, Reply, State}; handle_call(Req, _From, State) -> {stop, {badreq, Req}, State}. -handle_cast({unsubscribe, Topic, SubPid}, State) -> - ets:match_delete(topic_subscriber, #topic_subscriber{topic=Topic, qos ='_', subpid=SubPid}), - try_remove_topic(Topic), +handle_cast({unsubscribe, Topics, SubPid}, State) -> + lists:foreach(fun(Topic) -> + ets:match_delete(topic_subscriber, #topic_subscriber{topic=Topic, qos ='_', subpid=SubPid}), + try_remove_topic(Topic) + end, Topics), {noreply, State}; handle_cast(Msg, State) -> @@ -196,6 +205,42 @@ code_change(_OldVsn, State, _Extra) -> %% ------------------------------------------------------------------ %% Internal Function Definitions %% ------------------------------------------------------------------ +subscribe_topic({Topic, Qos}, SubPid) -> + case mnesia:transaction(fun trie_add/1, [Topic]) of + {atomic, _} -> + case get({subscriber, SubPid}) of + undefined -> + %%TODO: refactor later... + MonRef = erlang:monitor(process, SubPid), + put({subcriber, SubPid}, MonRef), + put({submon, MonRef}, SubPid); + _ -> + already_monitored + end, + %% remove duplicated subscribers + try_remove_subscriber({Topic, Qos}, SubPid), + ets:insert(topic_subscriber, #topic_subscriber{topic=Topic, qos = Qos, subpid=SubPid}), + %TODO: GrantedQos?? + {ok, Qos}; + {aborted, Reason} -> + {error, Reason} + end. + +try_remove_subscriber({Topic, Qos}, SubPid) -> + case ets:lookup(topic_subscriber, Topic) of + [] -> + not_found; + Subs -> + DupSubs = [Sub || Sub = #topic_subscriber{qos = OldQos, subpid = OldPid} + <- Subs, Qos =/= OldQos, OldPid =:= SubPid], + case DupSubs of + [] -> ok; + [DupSub] -> + lager:warning("PubSub: remove duplicated subscriber ~p", [DupSub]), + ets:delete(topic_subscriber, DupSub) + end + end. + try_remove_topic(Name) when is_binary(Name) -> case ets:member(topic_subscriber, Name) of false -> @@ -218,7 +263,7 @@ trie_add(Topic) when is_binary(Topic) -> [TrieNode=#topic_trie_node{topic=undefined}] -> mnesia:write(TrieNode#topic_trie_node{topic=Topic}); [#topic_trie_node{topic=Topic}] -> - ignore; + {atomic, already_exist}; [] -> %add trie path [trie_add_path(Triple) || Triple <- emqtt_topic:triples(Topic)],