From 5e53eaeee5b65d2d1ec0c0721d5cdddc311527f4 Mon Sep 17 00:00:00 2001 From: turtled Date: Sat, 8 Dec 2018 09:56:00 +0800 Subject: [PATCH] rename shard shared --- src/emqx_broker.erl | 29 ++++++++++++++++------------- src/emqx_broker_helper.erl | 12 ++++++------ 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 0a9264489..380e16c42 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -71,7 +71,7 @@ create_tabs() -> %% Subscriber: Topic -> SubPid1, SubPid2, SubPid3, ... %% duplicate_bag: o(1) insert - ok = emqx_tables:new(?SUBSCRIBER, [duplicate_bag | TabOpts]). + ok = emqx_tables:new(?SUBSCRIBER, [bag | TabOpts]). %%------------------------------------------------------------------------------ %% Subscribe API @@ -97,15 +97,16 @@ subscribe(Topic, SubId, SubOpts) when is_binary(Topic), ?is_subid(SubId), is_map true = ets:insert(?SUBSCRIPTION, {SubPid, Topic}), case maps:get(share, SubOpts, undefined) of undefined -> - Shard = emqx_broker_helper:get_shard(SubPid, Topic), - case Shard of + Shared = emqx_broker_helper:get_shared(SubPid, Topic), + case Shared of 0 -> true = ets:insert(?SUBSCRIBER, {Topic, SubPid}); - I -> true = ets:insert(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), - true = ets:insert(?SUBSCRIBER, {Topic, {shard, I}}) + I -> + true = ets:insert(?SUBSCRIBER, {{shared, Topic, I}, SubPid}), + true = ets:insert(?SUBSCRIBER, {Topic, {shared, I}}) end, - SubOpts1 = maps:put(shard, Shard, SubOpts), + SubOpts1 = maps:put(shared, Shared, SubOpts), true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts1}), - call(pick({Topic, Shard}), {subscribe, Topic}); + call(pick({Topic, Shared}), {subscribe, Topic}); Group -> %% Shared subscription true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}), emqx_shared_sub:subscribe(Group, Topic, SubPid) @@ -128,7 +129,7 @@ unsubscribe(Topic) when is_binary(Topic) -> case maps:get(shared, SubOpts, 0) of 0 -> true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), ok = cast(pick(Topic), {unsubscribed, Topic}); - I -> true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), + I -> true = ets:delete_object(?SUBSCRIBER, {{shared, Topic, I}, SubPid}), ok = cast(pick({Topic, I}), {unsubscribed, Topic, I}) end; Group -> @@ -239,10 +240,11 @@ dispatch(Topic, Delivery = #delivery{message = Msg, results = Results}) -> dispatch(SubPid, Topic, Msg) when is_pid(SubPid) -> SubPid ! {dispatch, Topic, Msg}; -dispatch({shard, I}, Topic, Msg) -> +dispatch({shared, I}, Topic, Msg) -> + lists:foreach(fun(SubPid) -> SubPid ! {dispatch, Topic, Msg} - end, safe_lookup_element(?SUBSCRIBER, {share, Topic, I}, [])). + end, safe_lookup_element(?SUBSCRIBER, {shared, Topic, I}, [])). inc_dropped_cnt(<<"$SYS/", _/binary>>) -> ok; @@ -267,7 +269,8 @@ subscriber_down(SubPid) -> case maps:get(shared, SubOpts, 0) of 0 -> true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), ok = cast(pick(Topic), {unsubscribed, Topic}); - I -> true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), + I -> true = ets:delete_object(?SUBSCRIBER, {Topic, {shared, I}}), + true = ets:delete_object(?SUBSCRIBER, {{shared, Topic, I}, SubPid}), ok = cast(pick({Topic, I}), {unsubscribed, Topic, I}) end; [] -> ok @@ -373,9 +376,9 @@ handle_cast({unsubscribed, Topic}, State) -> {noreply, State}; handle_cast({unsubscribed, Topic, I}, State) -> - case ets:member(?SUBSCRIBER, {shard, Topic, I}) of + case ets:member(?SUBSCRIBER, {shared, Topic, I}) of false -> - true = ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}}), + true = ets:delete_object(?SUBSCRIBER, {Topic, {shared, I}}), cast(pick(Topic), {unsubscribed, Topic}); true -> ok end, diff --git a/src/emqx_broker_helper.erl b/src/emqx_broker_helper.erl index d3e7f9d37..6830b4d32 100644 --- a/src/emqx_broker_helper.erl +++ b/src/emqx_broker_helper.erl @@ -20,7 +20,7 @@ -export([start_link/0]). -export([monitor/2]). --export([get_shard/2]). +-export([get_shared/2]). -export([create_seq/1, reclaim_seq/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, @@ -47,11 +47,11 @@ monitor(SubPid, SubId) when is_pid(SubPid) -> error(subid_conflict) end. --spec(get_shard(pid(), emqx_topic:topic()) -> non_neg_integer()). -get_shard(SubPid, Topic) -> +-spec(get_shared(pid(), emqx_topic:topic()) -> non_neg_integer()). +get_shared(SubPid, Topic) -> case create_seq(Topic) of Seq when Seq =< 1024 -> 0; - _Seq -> erlang:phash2(SubPid, ets:lookup_element(?SUBSEQ, shards, 2)) + _Seq -> erlang:phash2(SubPid, ets:lookup_element(?SUBSEQ, shareds, 2)) end. -spec(create_seq(emqx_topic:topic()) -> emqx_sequence:seqid()). @@ -69,8 +69,8 @@ reclaim_seq(Topic) -> init([]) -> %% SubSeq: Topic -> SeqId ok = emqx_sequence:create(?SUBSEQ), - %% Shards: CPU * 32 - true = ets:insert(?SUBSEQ, {shards, emqx_vm:schedulers() * 32}), + %% Shareds: CPU * 32 + true = ets:insert(?SUBSEQ, {shareds, emqx_vm:schedulers() * 32}), %% SubMon: SubPid -> SubId ok = emqx_tables:new(?SUBMON, [set, protected, {read_concurrency, true}]), %% Stats timer