diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 5f3e63059..b9332ffed 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -127,10 +127,16 @@ unsubscribe(Topic) when is_binary(Topic) -> case maps:get(share, SubOpts, undefined) of undefined -> case maps:get(shared, SubOpts, 0) of - 0 -> true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), - ok = cast(pick(Topic), {unsubscribed, Topic}); - I -> true = ets:delete_object(?SUBSCRIBER, {{shared, Topic, I}, SubPid}), - ok = cast(pick({Topic, I}), {unsubscribed, Topic, I}) + 0 -> + true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), + ok = cast(pick(Topic), {unsubscribed, Topic}); + I -> + true = ets:delete_object(?SUBSCRIBER, {{shared, Topic, I}, SubPid}), + case ets:member(emqx_subscriber, {shared, Topic, I}) of + true -> ok; + false -> ets:delete_object(?SUBSCRIBER, {Topic, {shared, I}}) + end, + ok = cast(pick({Topic, I}), {unsubscribed, Topic, I}) end; Group -> ok = emqx_shared_sub:unsubscribe(Group, Topic, SubPid) @@ -267,11 +273,16 @@ subscriber_down(SubPid) -> [{_, SubOpts}] -> _ = emqx_broker_helper:reclaim_seq(Topic), case maps:get(shared, SubOpts, 0) of - 0 -> true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), - ok = cast(pick(Topic), {unsubscribed, Topic}); - I -> true = ets:delete_object(?SUBSCRIBER, {Topic, {shared, I}}), - true = ets:delete_object(?SUBSCRIBER, {{shared, Topic, I}, SubPid}), - ok = cast(pick({Topic, I}), {unsubscribed, Topic, I}) + 0 -> + true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), + ok = cast(pick(Topic), {unsubscribed, Topic}); + I -> + true = ets:delete_object(?SUBSCRIBER, {{shared, Topic, I}, SubPid}), + case ets:member(emqx_subscriber, {shared, Topic, I}) of + true -> ok; + false -> ets:delete_object(?SUBSCRIBER, {Topic, {shared, I}}) + end, + ok = cast(pick({Topic, I}), {unsubscribed, Topic, I}) end, ets:delete(?SUBOPTION, Sub); [] -> ok