diff --git a/apps/emqttd/src/emqttd_pubsub.erl b/apps/emqttd/src/emqttd_pubsub.erl index b52037d29..b89355c76 100644 --- a/apps/emqttd/src/emqttd_pubsub.erl +++ b/apps/emqttd/src/emqttd_pubsub.erl @@ -24,6 +24,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_pubsub). -author("Feng Lee "). @@ -217,6 +218,7 @@ match(Topic) when is_binary(Topic) -> init([Id, _Opts]) -> process_flag(min_heap_size, 1024*1024), gproc_pool:connect_worker(pubsub, {?MODULE, Id}), + %%TODO: gb_trees to replace maps? {ok, #state{id = Id, submap = maps:new()}}. handle_call({subscribe, SubPid, Topics}, _From, State) -> @@ -384,9 +386,24 @@ add_topic(TopicR = #mqtt_topic{topic = Topic}) -> end end. -add_subscriber({TopicR, Subscriber}) when is_record(TopicR, mqtt_topic) -> +%% Fix issue #53 - Remove Overlapping Subscriptions +add_subscriber({TopicR, Subscriber = #mqtt_subscriber{topic = Topic, qos = Qos, pid = SubPid}}) + when is_record(TopicR, mqtt_topic) -> case add_topic(TopicR) of ok -> + OverlapSubs = [Sub || Sub = #mqtt_subscriber{topic = SubTopic, qos = SubQos} + <- mnesia:index_read(subscriber, SubPid, #mqtt_subscriber.pid), + SubTopic =:= Topic, SubQos =/= Qos], + + %% remove overlapping subscribers + if + length(OverlapSubs) =:= 0 -> ok; + true -> + lager:warning("Remove overlapping subscribers: ~p", [OverlapSubs]), + [mnesia:delete_object(subscriber, OverlapSub, write) || OverlapSub <- OverlapSubs] + end, + + %% insert subscriber mnesia:write(subscriber, Subscriber, write); Error -> Error