Fix unsubscribe fail and rename shared -> shard
This commit is contained in:
parent
d1be51d398
commit
5164d0d6a5
|
@ -97,19 +97,19 @@ subscribe(Topic, SubId, SubOpts) when is_binary(Topic), ?is_subid(SubId), is_map
|
||||||
true = ets:insert(?SUBSCRIPTION, {SubPid, Topic}),
|
true = ets:insert(?SUBSCRIPTION, {SubPid, Topic}),
|
||||||
case maps:get(share, SubOpts, undefined) of
|
case maps:get(share, SubOpts, undefined) of
|
||||||
undefined ->
|
undefined ->
|
||||||
Shared = emqx_broker_helper:get_shared(SubPid, Topic),
|
Shard = emqx_broker_helper:get_shard(SubPid, Topic),
|
||||||
case Shared of
|
case Shard of
|
||||||
0 -> true = ets:insert(?SUBSCRIBER, {Topic, SubPid});
|
0 -> true = ets:insert(?SUBSCRIBER, {Topic, SubPid});
|
||||||
I ->
|
I ->
|
||||||
true = ets:insert(?SUBSCRIBER, {{shared, Topic, I}, SubPid}),
|
true = ets:insert(?SUBSCRIBER, {{shard, Topic, I}, SubPid}),
|
||||||
true = ets:insert(?SUBSCRIBER, {Topic, {shared, I}})
|
true = ets:insert(?SUBSCRIBER, {Topic, {shard, I}})
|
||||||
end,
|
end,
|
||||||
SubOpts1 = maps:put(shared, Shared, SubOpts),
|
SubOpts1 = maps:put(shard, Shard, SubOpts),
|
||||||
true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts1}),
|
true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts1}),
|
||||||
call(pick({Topic, Shared}), {subscribe, Topic});
|
call(pick({Topic, Shard}), {subscribe, Topic});
|
||||||
Group -> %% Shared subscription
|
Group -> %% Shard subscription
|
||||||
true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}),
|
true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}),
|
||||||
emqx_shared_sub:subscribe(Group, Topic, SubPid)
|
emqx_shard_sub:subscribe(Group, Topic, SubPid)
|
||||||
end;
|
end;
|
||||||
true -> ok
|
true -> ok
|
||||||
end.
|
end.
|
||||||
|
@ -126,15 +126,15 @@ unsubscribe(Topic) when is_binary(Topic) ->
|
||||||
_ = emqx_broker_helper:reclaim_seq(Topic),
|
_ = emqx_broker_helper:reclaim_seq(Topic),
|
||||||
case maps:get(share, SubOpts, undefined) of
|
case maps:get(share, SubOpts, undefined) of
|
||||||
undefined ->
|
undefined ->
|
||||||
case maps:get(shared, SubOpts, 0) of
|
case maps:get(shard, SubOpts, 0) of
|
||||||
0 ->
|
0 ->
|
||||||
true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}),
|
true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}),
|
||||||
ok = cast(pick(Topic), {unsubscribed, Topic});
|
ok = cast(pick(Topic), {unsubscribed, Topic});
|
||||||
I ->
|
I ->
|
||||||
true = ets:delete_object(?SUBSCRIBER, {{shared, Topic, I}, SubPid}),
|
true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}),
|
||||||
case ets:member(emqx_subscriber, {shared, Topic, I}) of
|
case ets:member(emqx_subscriber, {shard, Topic, I}) of
|
||||||
true -> ok;
|
true -> ok;
|
||||||
false -> ets:delete_object(?SUBSCRIBER, {Topic, {shared, I}})
|
false -> ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}})
|
||||||
end,
|
end,
|
||||||
ok = cast(pick({Topic, I}), {unsubscribed, Topic, I})
|
ok = cast(pick({Topic, I}), {unsubscribed, Topic, I})
|
||||||
end;
|
end;
|
||||||
|
@ -143,7 +143,8 @@ unsubscribe(Topic) when is_binary(Topic) ->
|
||||||
end,
|
end,
|
||||||
true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}),
|
true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}),
|
||||||
%%true = ets:delete_object(?SUBID, {SubId, SubPid}),
|
%%true = ets:delete_object(?SUBID, {SubId, SubPid}),
|
||||||
true = ets:delete(?SUBOPTION, {SubPid, Topic});
|
true = ets:delete(?SUBOPTION, {SubPid, Topic}),
|
||||||
|
ok;
|
||||||
[] -> ok
|
[] -> ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -246,11 +247,11 @@ dispatch(Topic, Delivery = #delivery{message = Msg, results = Results}) ->
|
||||||
|
|
||||||
dispatch(SubPid, Topic, Msg) when is_pid(SubPid) ->
|
dispatch(SubPid, Topic, Msg) when is_pid(SubPid) ->
|
||||||
SubPid ! {dispatch, Topic, Msg};
|
SubPid ! {dispatch, Topic, Msg};
|
||||||
dispatch({shared, I}, Topic, Msg) ->
|
dispatch({shard, I}, Topic, Msg) ->
|
||||||
|
|
||||||
lists:foreach(fun(SubPid) ->
|
lists:foreach(fun(SubPid) ->
|
||||||
SubPid ! {dispatch, Topic, Msg}
|
SubPid ! {dispatch, Topic, Msg}
|
||||||
end, safe_lookup_element(?SUBSCRIBER, {shared, Topic, I}, [])).
|
end, safe_lookup_element(?SUBSCRIBER, {shard, Topic, I}, [])).
|
||||||
|
|
||||||
inc_dropped_cnt(<<"$SYS/", _/binary>>) ->
|
inc_dropped_cnt(<<"$SYS/", _/binary>>) ->
|
||||||
ok;
|
ok;
|
||||||
|
@ -272,15 +273,15 @@ subscriber_down(SubPid) ->
|
||||||
case ets:lookup(?SUBOPTION, Sub) of
|
case ets:lookup(?SUBOPTION, Sub) of
|
||||||
[{_, SubOpts}] ->
|
[{_, SubOpts}] ->
|
||||||
_ = emqx_broker_helper:reclaim_seq(Topic),
|
_ = emqx_broker_helper:reclaim_seq(Topic),
|
||||||
case maps:get(shared, SubOpts, 0) of
|
case maps:get(shard, SubOpts, 0) of
|
||||||
0 ->
|
0 ->
|
||||||
true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}),
|
true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}),
|
||||||
ok = cast(pick(Topic), {unsubscribed, Topic});
|
ok = cast(pick(Topic), {unsubscribed, Topic});
|
||||||
I ->
|
I ->
|
||||||
true = ets:delete_object(?SUBSCRIBER, {{shared, Topic, I}, SubPid}),
|
true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}),
|
||||||
case ets:member(emqx_subscriber, {shared, Topic, I}) of
|
case ets:member(emqx_subscriber, {shard, Topic, I}) of
|
||||||
true -> ok;
|
true -> ok;
|
||||||
false -> ets:delete_object(?SUBSCRIBER, {Topic, {shared, I}})
|
false -> ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}})
|
||||||
end,
|
end,
|
||||||
ok = cast(pick({Topic, I}), {unsubscribed, Topic, I})
|
ok = cast(pick({Topic, I}), {unsubscribed, Topic, I})
|
||||||
end,
|
end,
|
||||||
|
@ -388,9 +389,9 @@ handle_cast({unsubscribed, Topic}, State) ->
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
handle_cast({unsubscribed, Topic, I}, State) ->
|
handle_cast({unsubscribed, Topic, I}, State) ->
|
||||||
case ets:member(?SUBSCRIBER, {shared, Topic, I}) of
|
case ets:member(?SUBSCRIBER, {shard, Topic, I}) of
|
||||||
false ->
|
false ->
|
||||||
true = ets:delete_object(?SUBSCRIBER, {Topic, {shared, I}}),
|
true = ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}}),
|
||||||
cast(pick(Topic), {unsubscribed, Topic});
|
cast(pick(Topic), {unsubscribed, Topic});
|
||||||
true -> ok
|
true -> ok
|
||||||
end,
|
end,
|
||||||
|
|
|
@ -20,7 +20,7 @@
|
||||||
|
|
||||||
-export([start_link/0]).
|
-export([start_link/0]).
|
||||||
-export([monitor/2]).
|
-export([monitor/2]).
|
||||||
-export([get_shared/2]).
|
-export([get_shard/2]).
|
||||||
-export([create_seq/1, reclaim_seq/1]).
|
-export([create_seq/1, reclaim_seq/1]).
|
||||||
|
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
|
||||||
|
@ -47,11 +47,11 @@ monitor(SubPid, SubId) when is_pid(SubPid) ->
|
||||||
error(subid_conflict)
|
error(subid_conflict)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec(get_shared(pid(), emqx_topic:topic()) -> non_neg_integer()).
|
-spec(get_shard(pid(), emqx_topic:topic()) -> non_neg_integer()).
|
||||||
get_shared(SubPid, Topic) ->
|
get_shard(SubPid, Topic) ->
|
||||||
case create_seq(Topic) of
|
case create_seq(Topic) of
|
||||||
Seq when Seq =< 1024 -> 0;
|
Seq when Seq =< 1024 -> 0;
|
||||||
_Seq -> erlang:phash2(SubPid, ets:lookup_element(?SUBSEQ, shareds, 2))
|
_Seq -> erlang:phash2(SubPid, ets:lookup_element(?SUBSEQ, shards, 2))
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec(create_seq(emqx_topic:topic()) -> emqx_sequence:seqid()).
|
-spec(create_seq(emqx_topic:topic()) -> emqx_sequence:seqid()).
|
||||||
|
@ -69,8 +69,8 @@ reclaim_seq(Topic) ->
|
||||||
init([]) ->
|
init([]) ->
|
||||||
%% SubSeq: Topic -> SeqId
|
%% SubSeq: Topic -> SeqId
|
||||||
ok = emqx_sequence:create(?SUBSEQ),
|
ok = emqx_sequence:create(?SUBSEQ),
|
||||||
%% Shareds: CPU * 32
|
%% Shards: CPU * 32
|
||||||
true = ets:insert(?SUBSEQ, {shareds, emqx_vm:schedulers() * 32}),
|
true = ets:insert(?SUBSEQ, {shards, emqx_vm:schedulers() * 32}),
|
||||||
%% SubMon: SubPid -> SubId
|
%% SubMon: SubPid -> SubId
|
||||||
ok = emqx_tables:new(?SUBMON, [set, protected, {read_concurrency, true}]),
|
ok = emqx_tables:new(?SUBMON, [set, protected, {read_concurrency, true}]),
|
||||||
%% Stats timer
|
%% Stats timer
|
||||||
|
|
Loading…
Reference in New Issue