diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index 103e358f6..856a3f7ae 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -159,6 +159,14 @@ do_subscribe(Topic, SubPid, SubOpts) when is_binary(Topic) -> %% https://emqx.atlassian.net/browse/EMQX-10214 I = emqx_broker_helper:get_sub_shard(SubPid, Topic), true = ets:insert(?SUBOPTION, {{Topic, SubPid}, with_shard_idx(I, SubOpts)}), + %% NOTE + %% We are relying on the local state to minimize global routing state changes, + %% thus it's important that some operations on ETS tables on the same topic + %% should not be interleaved: `ets:member/2` + `ets:insert/2` that are part of + %% broker's `subscribe` codepath, and `ets:delete_object/2` that's part of + %% `unsubscribe` codepath. So we have to pick a worker according to the topic, + %% but not shard. If there are topics with high number of shards, then the + %% load across the pool will be unbalanced. call(pick(Topic), {subscribe, Topic, SubPid, I}); do_subscribe(Topic = #share{group = Group, topic = RealTopic}, SubPid, SubOpts) when is_binary(RealTopic) @@ -204,6 +212,14 @@ do_unsubscribe2(Topic, SubPid, SubOpts) when 0 -> emqx_exclusive_subscription:unsubscribe(Topic, SubOpts); _ -> ok end, + %% NOTE + %% We are relying on the local state to minimize global routing state changes, + %% thus it's important that some operations on ETS tables on the same topic + %% should not be interleaved: `ets:member/2` + `ets:insert/2` that are part of + %% broker's `subscribe` codepath, and `ets:delete_object/2` that's part of + %% `unsubscribe` codepath. So we have to pick a worker according to the topic, + %% but not shard. If there are topics with high number of shards, then the + %% load across the pool will be unbalanced. cast(pick(Topic), {unsubscribed, Topic, SubPid, I}); do_unsubscribe2(#share{group = Group, topic = Topic}, SubPid, _SubOpts) when is_binary(Group), is_binary(Topic), is_pid(SubPid)