Fix share_sub disconnect not clear route bug
This commit is contained in:
parent
af21cdfd1b
commit
9711892f73
|
@ -399,19 +399,20 @@ do_unsubscribe(Group, Topic, Subscriber) ->
|
||||||
ets:delete(?SUBOPTION, {Topic, Subscriber}).
|
ets:delete(?SUBOPTION, {Topic, Subscriber}).
|
||||||
|
|
||||||
subscriber_down(Subscriber) ->
|
subscriber_down(Subscriber) ->
|
||||||
Topics = lists:map(fun({_, {share, _, Topic}}) ->
|
Topics = lists:map(fun({_, {share, Group, Topic}}) ->
|
||||||
Topic;
|
{Topic, Group};
|
||||||
({_, Topic}) ->
|
({_, Topic}) ->
|
||||||
Topic
|
{Topic, undefined}
|
||||||
end, ets:lookup(?SUBSCRIPTION, Subscriber)),
|
end, ets:lookup(?SUBSCRIPTION, Subscriber)),
|
||||||
lists:foreach(fun(Topic) ->
|
lists:foreach(fun({Topic, undefined}) ->
|
||||||
case ets:lookup(?SUBOPTION, {Topic, Subscriber}) of
|
true = do_unsubscribe(undefined, Topic, Subscriber),
|
||||||
[{_, SubOpts}] ->
|
ets:member(?SUBSCRIBER, Topic) orelse emqx_router:del_route(Topic, dest(undefined));
|
||||||
Group = maps:get(share, SubOpts, undefined),
|
({Topic, Group}) ->
|
||||||
true = do_unsubscribe(Group, Topic, Subscriber),
|
true = do_unsubscribe(Group, Topic, Subscriber),
|
||||||
ets:member(?SUBSCRIBER, Topic)
|
Groups = groups(Topic),
|
||||||
orelse emqx_router:del_route(Topic, dest(Group));
|
case lists:member(Group, lists:usort(Groups)) of
|
||||||
[] -> ok
|
true -> ok;
|
||||||
|
false -> emqx_router:del_route(Topic, dest(Group))
|
||||||
end
|
end
|
||||||
end, Topics).
|
end, Topics).
|
||||||
|
|
||||||
|
@ -430,3 +431,9 @@ dest(Group) -> {Group, node()}.
|
||||||
shared(undefined, Name) -> Name;
|
shared(undefined, Name) -> Name;
|
||||||
shared(Group, Name) -> {share, Group, Name}.
|
shared(Group, Name) -> {share, Group, Name}.
|
||||||
|
|
||||||
|
groups(Topic) ->
|
||||||
|
lists:foldl(fun({_, {share, Group, _}}, Acc) ->
|
||||||
|
[Group | Acc];
|
||||||
|
({_, _}, Acc) ->
|
||||||
|
Acc
|
||||||
|
end, [], ets:lookup(?SUBSCRIBER, Topic)).
|
||||||
|
|
|
@ -167,6 +167,13 @@ handle_cast({del_route, From, Route}, State) ->
|
||||||
_ = gen_server:reply(From, ok),
|
_ = gen_server:reply(From, ok),
|
||||||
{noreply, NewState};
|
{noreply, NewState};
|
||||||
|
|
||||||
|
handle_cast({del_route, Route = #route{topic = Topic, dest = Dest}}, State) when is_tuple(Dest) ->
|
||||||
|
{noreply, case emqx_topic:wildcard(Topic) of
|
||||||
|
true -> log(trans(fun del_trie_route/1, [Route])),
|
||||||
|
State;
|
||||||
|
false -> del_direct_route(Route, State)
|
||||||
|
end};
|
||||||
|
|
||||||
handle_cast({del_route, Route = #route{topic = Topic}}, State) ->
|
handle_cast({del_route, Route = #route{topic = Topic}}, State) ->
|
||||||
%% Confirm if there are still subscribers...
|
%% Confirm if there are still subscribers...
|
||||||
{noreply, case ets:member(emqx_subscriber, Topic) of
|
{noreply, case ets:member(emqx_subscriber, Topic) of
|
||||||
|
|
Loading…
Reference in New Issue