chore: leave comment describing need for tighter synchronization
This commit is contained in:
parent
b9627c420f
commit
5c91984ad7
|
@ -159,6 +159,14 @@ do_subscribe(Topic, SubPid, SubOpts) when is_binary(Topic) ->
|
||||||
%% https://emqx.atlassian.net/browse/EMQX-10214
|
%% https://emqx.atlassian.net/browse/EMQX-10214
|
||||||
I = emqx_broker_helper:get_sub_shard(SubPid, Topic),
|
I = emqx_broker_helper:get_sub_shard(SubPid, Topic),
|
||||||
true = ets:insert(?SUBOPTION, {{Topic, SubPid}, with_shard_idx(I, SubOpts)}),
|
true = ets:insert(?SUBOPTION, {{Topic, SubPid}, with_shard_idx(I, SubOpts)}),
|
||||||
|
%% NOTE
|
||||||
|
%% We are relying on the local state to minimize global routing state changes,
|
||||||
|
%% thus it's important that some operations on ETS tables on the same topic
|
||||||
|
%% should not be interleaved: `ets:member/2` + `ets:insert/2` that are part of
|
||||||
|
%% broker's `subscribe` codepath, and `ets:delete_object/2` that's part of
|
||||||
|
%% `unsubscribe` codepath. So we have to pick a worker according to the topic,
|
||||||
|
%% but not shard. If there are topics with high number of shards, then the
|
||||||
|
%% load across the pool will be unbalanced.
|
||||||
call(pick(Topic), {subscribe, Topic, SubPid, I});
|
call(pick(Topic), {subscribe, Topic, SubPid, I});
|
||||||
do_subscribe(Topic = #share{group = Group, topic = RealTopic}, SubPid, SubOpts) when
|
do_subscribe(Topic = #share{group = Group, topic = RealTopic}, SubPid, SubOpts) when
|
||||||
is_binary(RealTopic)
|
is_binary(RealTopic)
|
||||||
|
@ -204,6 +212,14 @@ do_unsubscribe2(Topic, SubPid, SubOpts) when
|
||||||
0 -> emqx_exclusive_subscription:unsubscribe(Topic, SubOpts);
|
0 -> emqx_exclusive_subscription:unsubscribe(Topic, SubOpts);
|
||||||
_ -> ok
|
_ -> ok
|
||||||
end,
|
end,
|
||||||
|
%% NOTE
|
||||||
|
%% We are relying on the local state to minimize global routing state changes,
|
||||||
|
%% thus it's important that some operations on ETS tables on the same topic
|
||||||
|
%% should not be interleaved: `ets:member/2` + `ets:insert/2` that are part of
|
||||||
|
%% broker's `subscribe` codepath, and `ets:delete_object/2` that's part of
|
||||||
|
%% `unsubscribe` codepath. So we have to pick a worker according to the topic,
|
||||||
|
%% but not shard. If there are topics with high number of shards, then the
|
||||||
|
%% load across the pool will be unbalanced.
|
||||||
cast(pick(Topic), {unsubscribed, Topic, SubPid, I});
|
cast(pick(Topic), {unsubscribed, Topic, SubPid, I});
|
||||||
do_unsubscribe2(#share{group = Group, topic = Topic}, SubPid, _SubOpts) when
|
do_unsubscribe2(#share{group = Group, topic = Topic}, SubPid, _SubOpts) when
|
||||||
is_binary(Group), is_binary(Topic), is_pid(SubPid)
|
is_binary(Group), is_binary(Topic), is_pid(SubPid)
|
||||||
|
|
Loading…
Reference in New Issue