rename shard shared

This commit is contained in:
turtled 2018-12-08 09:56:00 +08:00
parent 36e7d63d66
commit 5e53eaeee5
2 changed files with 22 additions and 19 deletions

View File

@ -71,7 +71,7 @@ create_tabs() ->
%% Subscriber: Topic -> SubPid1, SubPid2, SubPid3, ... %% Subscriber: Topic -> SubPid1, SubPid2, SubPid3, ...
%% duplicate_bag: o(1) insert %% duplicate_bag: o(1) insert
ok = emqx_tables:new(?SUBSCRIBER, [duplicate_bag | TabOpts]). ok = emqx_tables:new(?SUBSCRIBER, [bag | TabOpts]).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Subscribe API %% Subscribe API
@ -97,15 +97,16 @@ subscribe(Topic, SubId, SubOpts) when is_binary(Topic), ?is_subid(SubId), is_map
true = ets:insert(?SUBSCRIPTION, {SubPid, Topic}), true = ets:insert(?SUBSCRIPTION, {SubPid, Topic}),
case maps:get(share, SubOpts, undefined) of case maps:get(share, SubOpts, undefined) of
undefined -> undefined ->
Shard = emqx_broker_helper:get_shard(SubPid, Topic), Shared = emqx_broker_helper:get_shared(SubPid, Topic),
case Shard of case Shared of
0 -> true = ets:insert(?SUBSCRIBER, {Topic, SubPid}); 0 -> true = ets:insert(?SUBSCRIBER, {Topic, SubPid});
I -> true = ets:insert(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), I ->
true = ets:insert(?SUBSCRIBER, {Topic, {shard, I}}) true = ets:insert(?SUBSCRIBER, {{shared, Topic, I}, SubPid}),
true = ets:insert(?SUBSCRIBER, {Topic, {shared, I}})
end, end,
SubOpts1 = maps:put(shard, Shard, SubOpts), SubOpts1 = maps:put(shared, Shared, SubOpts),
true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts1}), true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts1}),
call(pick({Topic, Shard}), {subscribe, Topic}); call(pick({Topic, Shared}), {subscribe, Topic});
Group -> %% Shared subscription Group -> %% Shared subscription
true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}), true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}),
emqx_shared_sub:subscribe(Group, Topic, SubPid) emqx_shared_sub:subscribe(Group, Topic, SubPid)
@ -128,7 +129,7 @@ unsubscribe(Topic) when is_binary(Topic) ->
case maps:get(shared, SubOpts, 0) of case maps:get(shared, SubOpts, 0) of
0 -> true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), 0 -> true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}),
ok = cast(pick(Topic), {unsubscribed, Topic}); ok = cast(pick(Topic), {unsubscribed, Topic});
I -> true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), I -> true = ets:delete_object(?SUBSCRIBER, {{shared, Topic, I}, SubPid}),
ok = cast(pick({Topic, I}), {unsubscribed, Topic, I}) ok = cast(pick({Topic, I}), {unsubscribed, Topic, I})
end; end;
Group -> Group ->
@ -239,10 +240,11 @@ dispatch(Topic, Delivery = #delivery{message = Msg, results = Results}) ->
dispatch(SubPid, Topic, Msg) when is_pid(SubPid) -> dispatch(SubPid, Topic, Msg) when is_pid(SubPid) ->
SubPid ! {dispatch, Topic, Msg}; SubPid ! {dispatch, Topic, Msg};
dispatch({shard, I}, Topic, Msg) -> dispatch({shared, I}, Topic, Msg) ->
lists:foreach(fun(SubPid) -> lists:foreach(fun(SubPid) ->
SubPid ! {dispatch, Topic, Msg} SubPid ! {dispatch, Topic, Msg}
end, safe_lookup_element(?SUBSCRIBER, {share, Topic, I}, [])). end, safe_lookup_element(?SUBSCRIBER, {shared, Topic, I}, [])).
inc_dropped_cnt(<<"$SYS/", _/binary>>) -> inc_dropped_cnt(<<"$SYS/", _/binary>>) ->
ok; ok;
@ -267,7 +269,8 @@ subscriber_down(SubPid) ->
case maps:get(shared, SubOpts, 0) of case maps:get(shared, SubOpts, 0) of
0 -> true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), 0 -> true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}),
ok = cast(pick(Topic), {unsubscribed, Topic}); ok = cast(pick(Topic), {unsubscribed, Topic});
I -> true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), I -> true = ets:delete_object(?SUBSCRIBER, {Topic, {shared, I}}),
true = ets:delete_object(?SUBSCRIBER, {{shared, Topic, I}, SubPid}),
ok = cast(pick({Topic, I}), {unsubscribed, Topic, I}) ok = cast(pick({Topic, I}), {unsubscribed, Topic, I})
end; end;
[] -> ok [] -> ok
@ -373,9 +376,9 @@ handle_cast({unsubscribed, Topic}, State) ->
{noreply, State}; {noreply, State};
handle_cast({unsubscribed, Topic, I}, State) -> handle_cast({unsubscribed, Topic, I}, State) ->
case ets:member(?SUBSCRIBER, {shard, Topic, I}) of case ets:member(?SUBSCRIBER, {shared, Topic, I}) of
false -> false ->
true = ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}}), true = ets:delete_object(?SUBSCRIBER, {Topic, {shared, I}}),
cast(pick(Topic), {unsubscribed, Topic}); cast(pick(Topic), {unsubscribed, Topic});
true -> ok true -> ok
end, end,

View File

@ -20,7 +20,7 @@
-export([start_link/0]). -export([start_link/0]).
-export([monitor/2]). -export([monitor/2]).
-export([get_shard/2]). -export([get_shared/2]).
-export([create_seq/1, reclaim_seq/1]). -export([create_seq/1, reclaim_seq/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
@ -47,11 +47,11 @@ monitor(SubPid, SubId) when is_pid(SubPid) ->
error(subid_conflict) error(subid_conflict)
end. end.
-spec(get_shard(pid(), emqx_topic:topic()) -> non_neg_integer()). -spec(get_shared(pid(), emqx_topic:topic()) -> non_neg_integer()).
get_shard(SubPid, Topic) -> get_shared(SubPid, Topic) ->
case create_seq(Topic) of case create_seq(Topic) of
Seq when Seq =< 1024 -> 0; Seq when Seq =< 1024 -> 0;
_Seq -> erlang:phash2(SubPid, ets:lookup_element(?SUBSEQ, shards, 2)) _Seq -> erlang:phash2(SubPid, ets:lookup_element(?SUBSEQ, shareds, 2))
end. end.
-spec(create_seq(emqx_topic:topic()) -> emqx_sequence:seqid()). -spec(create_seq(emqx_topic:topic()) -> emqx_sequence:seqid()).
@ -69,8 +69,8 @@ reclaim_seq(Topic) ->
init([]) -> init([]) ->
%% SubSeq: Topic -> SeqId %% SubSeq: Topic -> SeqId
ok = emqx_sequence:create(?SUBSEQ), ok = emqx_sequence:create(?SUBSEQ),
%% Shards: CPU * 32 %% Shareds: CPU * 32
true = ets:insert(?SUBSEQ, {shards, emqx_vm:schedulers() * 32}), true = ets:insert(?SUBSEQ, {shareds, emqx_vm:schedulers() * 32}),
%% SubMon: SubPid -> SubId %% SubMon: SubPid -> SubId
ok = emqx_tables:new(?SUBMON, [set, protected, {read_concurrency, true}]), ok = emqx_tables:new(?SUBMON, [set, protected, {read_concurrency, true}]),
%% Stats timer %% Stats timer