diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 560c095cf..17d88878d 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -399,20 +399,21 @@ do_unsubscribe(Group, Topic, Subscriber) -> ets:delete(?SUBOPTION, {Topic, Subscriber}). subscriber_down(Subscriber) -> - Topics = lists:map(fun({_, {share, _, Topic}}) -> - Topic; + Topics = lists:map(fun({_, {share, Group, Topic}}) -> + {Topic, Group}; ({_, Topic}) -> - Topic + {Topic, undefined} end, ets:lookup(?SUBSCRIPTION, Subscriber)), - lists:foreach(fun(Topic) -> - case ets:lookup(?SUBOPTION, {Topic, Subscriber}) of - [{_, SubOpts}] -> - Group = maps:get(share, SubOpts, undefined), - true = do_unsubscribe(Group, Topic, Subscriber), - ets:member(?SUBSCRIBER, Topic) - orelse emqx_router:del_route(Topic, dest(Group)); - [] -> ok - end + lists:foreach(fun({Topic, undefined}) -> + true = do_unsubscribe(undefined, Topic, Subscriber), + ets:member(?SUBSCRIBER, Topic) orelse emqx_router:del_route(Topic, dest(undefined)); + ({Topic, Group}) -> + true = do_unsubscribe(Group, Topic, Subscriber), + Groups = groups(Topic), + case lists:member(Group, lists:usort(Groups)) of + true -> ok; + false -> emqx_router:del_route(Topic, dest(Group)) + end end, Topics). monitor_subscriber({SubPid, SubId}, State = #state{submap = SubMap, submon = SubMon}) -> @@ -430,3 +431,9 @@ dest(Group) -> {Group, node()}. shared(undefined, Name) -> Name; shared(Group, Name) -> {share, Group, Name}. +groups(Topic) -> + lists:foldl(fun({_, {share, Group, _}}, Acc) -> + [Group | Acc]; + ({_, _}, Acc) -> + Acc + end, [], ets:lookup(?SUBSCRIBER, Topic)). diff --git a/src/emqx_router.erl b/src/emqx_router.erl index 863214617..df2d2e018 100644 --- a/src/emqx_router.erl +++ b/src/emqx_router.erl @@ -167,6 +167,13 @@ handle_cast({del_route, From, Route}, State) -> _ = gen_server:reply(From, ok), {noreply, NewState}; +handle_cast({del_route, Route = #route{topic = Topic, dest = Dest}}, State) when is_tuple(Dest) -> + {noreply, case emqx_topic:wildcard(Topic) of + true -> log(trans(fun del_trie_route/1, [Route])), + State; + false -> del_direct_route(Route, State) + end}; + handle_cast({del_route, Route = #route{topic = Topic}}, State) -> %% Confirm if there are still subscribers... {noreply, case ets:member(emqx_subscriber, Topic) of