From 5164d0d6a57e3be187d3d5dfee1b605dbd4f688f Mon Sep 17 00:00:00 2001 From: turtled Date: Sat, 8 Dec 2018 11:40:08 +0800 Subject: [PATCH] Fix unsubscribe fail and rename shared -> shard --- src/emqx_broker.erl | 43 +++++++++++++++++++------------------- src/emqx_broker_helper.erl | 12 +++++------ 2 files changed, 28 insertions(+), 27 deletions(-) diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index b9332ffed..6556a59e5 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -97,19 +97,19 @@ subscribe(Topic, SubId, SubOpts) when is_binary(Topic), ?is_subid(SubId), is_map true = ets:insert(?SUBSCRIPTION, {SubPid, Topic}), case maps:get(share, SubOpts, undefined) of undefined -> - Shared = emqx_broker_helper:get_shared(SubPid, Topic), - case Shared of + Shard = emqx_broker_helper:get_shard(SubPid, Topic), + case Shard of 0 -> true = ets:insert(?SUBSCRIBER, {Topic, SubPid}); I -> - true = ets:insert(?SUBSCRIBER, {{shared, Topic, I}, SubPid}), - true = ets:insert(?SUBSCRIBER, {Topic, {shared, I}}) + true = ets:insert(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), + true = ets:insert(?SUBSCRIBER, {Topic, {shard, I}}) end, - SubOpts1 = maps:put(shared, Shared, SubOpts), + SubOpts1 = maps:put(shard, Shard, SubOpts), true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts1}), - call(pick({Topic, Shared}), {subscribe, Topic}); - Group -> %% Shared subscription + call(pick({Topic, Shard}), {subscribe, Topic}); + Group -> %% Shard subscription true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}), - emqx_shared_sub:subscribe(Group, Topic, SubPid) + emqx_shard_sub:subscribe(Group, Topic, SubPid) end; true -> ok end. @@ -126,15 +126,15 @@ unsubscribe(Topic) when is_binary(Topic) -> _ = emqx_broker_helper:reclaim_seq(Topic), case maps:get(share, SubOpts, undefined) of undefined -> - case maps:get(shared, SubOpts, 0) of + case maps:get(shard, SubOpts, 0) of 0 -> true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), ok = cast(pick(Topic), {unsubscribed, Topic}); I -> - true = ets:delete_object(?SUBSCRIBER, {{shared, Topic, I}, SubPid}), - case ets:member(emqx_subscriber, {shared, Topic, I}) of + true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), + case ets:member(emqx_subscriber, {shard, Topic, I}) of true -> ok; - false -> ets:delete_object(?SUBSCRIBER, {Topic, {shared, I}}) + false -> ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}}) end, ok = cast(pick({Topic, I}), {unsubscribed, Topic, I}) end; @@ -143,7 +143,8 @@ unsubscribe(Topic) when is_binary(Topic) -> end, true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}), %%true = ets:delete_object(?SUBID, {SubId, SubPid}), - true = ets:delete(?SUBOPTION, {SubPid, Topic}); + true = ets:delete(?SUBOPTION, {SubPid, Topic}), + ok; [] -> ok end. @@ -246,11 +247,11 @@ dispatch(Topic, Delivery = #delivery{message = Msg, results = Results}) -> dispatch(SubPid, Topic, Msg) when is_pid(SubPid) -> SubPid ! {dispatch, Topic, Msg}; -dispatch({shared, I}, Topic, Msg) -> +dispatch({shard, I}, Topic, Msg) -> lists:foreach(fun(SubPid) -> SubPid ! {dispatch, Topic, Msg} - end, safe_lookup_element(?SUBSCRIBER, {shared, Topic, I}, [])). + end, safe_lookup_element(?SUBSCRIBER, {shard, Topic, I}, [])). inc_dropped_cnt(<<"$SYS/", _/binary>>) -> ok; @@ -272,15 +273,15 @@ subscriber_down(SubPid) -> case ets:lookup(?SUBOPTION, Sub) of [{_, SubOpts}] -> _ = emqx_broker_helper:reclaim_seq(Topic), - case maps:get(shared, SubOpts, 0) of + case maps:get(shard, SubOpts, 0) of 0 -> true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), ok = cast(pick(Topic), {unsubscribed, Topic}); I -> - true = ets:delete_object(?SUBSCRIBER, {{shared, Topic, I}, SubPid}), - case ets:member(emqx_subscriber, {shared, Topic, I}) of + true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), + case ets:member(emqx_subscriber, {shard, Topic, I}) of true -> ok; - false -> ets:delete_object(?SUBSCRIBER, {Topic, {shared, I}}) + false -> ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}}) end, ok = cast(pick({Topic, I}), {unsubscribed, Topic, I}) end, @@ -388,9 +389,9 @@ handle_cast({unsubscribed, Topic}, State) -> {noreply, State}; handle_cast({unsubscribed, Topic, I}, State) -> - case ets:member(?SUBSCRIBER, {shared, Topic, I}) of + case ets:member(?SUBSCRIBER, {shard, Topic, I}) of false -> - true = ets:delete_object(?SUBSCRIBER, {Topic, {shared, I}}), + true = ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}}), cast(pick(Topic), {unsubscribed, Topic}); true -> ok end, diff --git a/src/emqx_broker_helper.erl b/src/emqx_broker_helper.erl index 6830b4d32..d3e7f9d37 100644 --- a/src/emqx_broker_helper.erl +++ b/src/emqx_broker_helper.erl @@ -20,7 +20,7 @@ -export([start_link/0]). -export([monitor/2]). --export([get_shared/2]). +-export([get_shard/2]). -export([create_seq/1, reclaim_seq/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, @@ -47,11 +47,11 @@ monitor(SubPid, SubId) when is_pid(SubPid) -> error(subid_conflict) end. --spec(get_shared(pid(), emqx_topic:topic()) -> non_neg_integer()). -get_shared(SubPid, Topic) -> +-spec(get_shard(pid(), emqx_topic:topic()) -> non_neg_integer()). +get_shard(SubPid, Topic) -> case create_seq(Topic) of Seq when Seq =< 1024 -> 0; - _Seq -> erlang:phash2(SubPid, ets:lookup_element(?SUBSEQ, shareds, 2)) + _Seq -> erlang:phash2(SubPid, ets:lookup_element(?SUBSEQ, shards, 2)) end. -spec(create_seq(emqx_topic:topic()) -> emqx_sequence:seqid()). @@ -69,8 +69,8 @@ reclaim_seq(Topic) -> init([]) -> %% SubSeq: Topic -> SeqId ok = emqx_sequence:create(?SUBSEQ), - %% Shareds: CPU * 32 - true = ets:insert(?SUBSEQ, {shareds, emqx_vm:schedulers() * 32}), + %% Shards: CPU * 32 + true = ets:insert(?SUBSEQ, {shards, emqx_vm:schedulers() * 32}), %% SubMon: SubPid -> SubId ok = emqx_tables:new(?SUBMON, [set, protected, {read_concurrency, true}]), %% Stats timer